72

我有一个单线程生产者,它创建一些任务对象,然后将它们添加到一个ArrayBlockingQueue(固定大小)中。

我还启动了一个多线程消费者。这是作为固定线程池 ( Executors.newFixedThreadPool(threadCount);) 构建的。然后我将一些 ConsumerWorker 实例提交给这个 threadPool,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。

每个这样的 Worker 都会take()在队列上做一个并处理任务。

我的问题是,让工人知道什么时候没有更多工作要做的最好方法是什么。换句话说,我如何告诉Workers生产者已经完成加入队列,并且从这一点开始,每个worker看到队列为空时应该停止。

我现在得到的是一个设置,我的 Producer 使用回调初始化,当他完成它的工作(向队列中添加东西)时触发该回调。我还保留了我创建并提交到 ThreadPool 的所有 ConsumerWorkers 的列表。当生产者回调告诉我生产者完成时,我可以告诉每个工人。在这一点上,他们应该简单地继续检查队列是否不为空,当它变空时他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时生产者会完成,发出信号,而 ConsumerWorkers 会在消耗队列中的所有内容之前停止。

我的问题是同步它的最佳方法是什么,以便一切正常?我是否应该同步检查生产者是否正在运行的整个部分,以及队列是否为空,并在一个块中从队列中取出一些东西(在队列对象上)?我应该只在 ConsumerWorker 实例上同步isRunning布尔值的更新吗?还有什么建议吗?

更新,这是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这在很大程度上受到了 JohnVint 在下面的回答的启发,只做了一些小的修改。

=== 由于@vendhan 的评论而更新。

感谢您的关注。你是对的,这个问题中的第一段代码(在其他问题中)有一个while(isRunning || !inputQueue.isEmpty())没有真正意义的代码段。

在我的实际最终实现中,我做了一些更接近您替换“||”的建议 (or) with "&&" (and),意思是每个工人(消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是的话就停止(所以理论上我们可以说工人有正在运行且队列不能为空)。

4

6 回答 6

87

take()您应该从队列中继续。您可以使用毒丸来告诉工人停下来。例如:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
于 2012-01-23T16:10:46.230 回答
14

我会向工人发送一个特殊的工作包,以表明他们应该关闭:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

要停止工作人员,只需添加ConsumerWorker.DONE到队列中。

于 2012-01-23T16:15:21.800 回答
1

在您尝试从队列中检索元素的代码块中,使用poll(time,unit)而不是take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

通过指定适当的 timeout 值,您可以确保线程不会继续阻塞,以防万一不幸发生

  1. isRunning是真的
  2. 队列变为空,因此线程进入阻塞等待(如果使用take()
  3. isRunning设置为假
于 2012-01-23T16:47:54.667 回答
1

我们不能使用 a 来做到这一点CountDownLatch,其中大小是生产者中的记录数。并且每个消费者都会countDown处理一个记录。awaits()并且它在所有任务完成时交叉方法。然后停止所有消费者。因为所有记录都已处理。

于 2017-11-15T06:40:14.090 回答
0

您可以使用多种策略,但一个简单的策略是拥有一个任务子类来表示工作的结束。生产者不直接发送此信号。相反,它将这个任务子类的一个实例排入队列。当您的一个消费者完成此任务并执行它时,就会发送信号。

于 2012-01-23T16:11:10.123 回答
0

我不得不使用多线程生产者和多线程消费者。我最终得到了一个Scheduler -- N Producers -- M Consumers方案,每两个通过一个队列进行通信(总共两个队列)。调度器用生产数据的请求填充第一个队列,然后用 N 个“毒丸”填充它。有一个活跃生产者计数器(atomic int),最后一个收到最后一个毒丸的生产者将M个毒丸发送到消费者队列。

于 2017-02-13T09:32:18.917 回答