3

我正在用 Java 编写一个负载测试应用程序,并且有一个线程池可以针对被测服务器执行任务。因此,要制作 1000 个作业并在 5 个线程中运行它们,我会执行以下操作:

    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<Runnable> jobs = makeJobs(1000);
    for(Runnable job : jobs){
        pool.execute(job);
    }

但是我认为这种方法不会很好地扩展,因为我必须提前制作所有“工作”对象并将它们放在内存中,直到需要它们为止。

我正在寻找一种方法,让池中的线程在每次需要新作业时都转到某种“JobFactory”类,并让工厂根据请求构建 Runnables,直到运行所需数量的作业。工厂可能会开始返回“null”以向线程发出没有更多工作要做的信号。

我可以手动编写这样的代码,但这似乎是一个足够常见的用例,我想知道在美妙但复杂的“java.util.concurrent”包中是否有任何东西可以用来代替?

4

2 回答 2

5

您可以使用 AtomicInteger 在线程池的执行线程中完成所有工作,以监视执行的可运行对象的数量

 int numberOfParties = 5;
 AtomicInteger numberOfJobsToExecute = new AtomicInteger(1000);
 ExecutorService pool = Executors.newFixedThreadPool(numberOfParties );
 for(int i =0; i < numberOfParties; i++){
     pool.submit(new Runnable(){
        public void run(){
            while(numberOfJobsToExecute.decrementAndGet() >= 0){
                makeJobs(1).get(0).run();
            }
        }
     });
 }

您还可以将返回的 Future 存储在 List 中并get()在它们上等待完成(以及其他机制)

于 2012-03-06T20:10:07.687 回答
4

人力资源管理系统。您可以创建BlockingQueue<Runnable>具有固定容量的 a 并让每个工作线程出列 aRunnable并运行它。然后你可以有一个生产者线程,它将作业放入队列。

主线程会做类似的事情:

// 100 is the capacity of the queue before blocking
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
// start the submitter thread
new Thread(new JobSubmitterThread(queue)).start();
// make in a loop or something?
new Thread(new WorkerThread(queue)).start();
new Thread(new WorkerThread(queue)).start();
...

工人看起来像:

public class WorkerThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         // run until the main thread shuts it down using volatile boolean or ...
         while (!shutdown) {
             Runnable job = queue.take();
             job.run();
         }
     }
}

作业提交者看起来像:

 public class JobSubmitterThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         for (int jobC = 0; jobC < 1000; jobC++) {
             Runnable job = makeJob();
             // this would block when the queue reaches capacity
             queue.put(job);
         }
     }
 }
于 2012-03-06T20:05:03.193 回答