我有一个单线程生产者,它创建一些任务对象,然后将它们添加到一个ArrayBlockingQueue
(固定大小)中。
我还启动了一个多线程消费者。这是作为固定线程池 ( Executors.newFixedThreadPool(threadCount);
) 构建的。然后我将一些 ConsumerWorker 实例提交给这个 threadPool,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。
每个这样的 Worker 都会take()
在队列上做一个并处理任务。
我的问题是,让工人知道什么时候没有更多工作要做的最好方法是什么。换句话说,我如何告诉Workers生产者已经完成加入队列,并且从这一点开始,每个worker看到队列为空时应该停止。
我现在得到的是一个设置,我的 Producer 使用回调初始化,当他完成它的工作(向队列中添加东西)时触发该回调。我还保留了我创建并提交到 ThreadPool 的所有 ConsumerWorkers 的列表。当生产者回调告诉我生产者完成时,我可以告诉每个工人。在这一点上,他们应该简单地继续检查队列是否不为空,当它变空时他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。是这样的
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
这里的问题是我有一个明显的竞争条件,有时生产者会完成,发出信号,而 ConsumerWorkers 会在消耗队列中的所有内容之前停止。
我的问题是同步它的最佳方法是什么,以便一切正常?我是否应该同步检查生产者是否正在运行的整个部分,以及队列是否为空,并在一个块中从队列中取出一些东西(在队列对象上)?我应该只在 ConsumerWorker 实例上同步isRunning
布尔值的更新吗?还有什么建议吗?
更新,这是我最终使用的工作实现:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这在很大程度上受到了 JohnVint 在下面的回答的启发,只做了一些小的修改。
=== 由于@vendhan 的评论而更新。
感谢您的关注。你是对的,这个问题中的第一段代码(在其他问题中)有一个while(isRunning || !inputQueue.isEmpty())
没有真正意义的代码段。
在我的实际最终实现中,我做了一些更接近您替换“||”的建议 (or) with "&&" (and),意思是每个工人(消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是的话就停止(所以理论上我们可以说工人有正在运行且队列不能为空)。