0

我正在开发一个需要并行和定期检查多个资源的程序:

public class JobRunner {

    private final SensorService sensorService;
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    public void run() {
        sensorService.finalAll().forEach(sensor -> {
            Runnable task = () -> {
                // read and save new data to log
                List<Double> data = sensor.fetchNewData();
                this.save(data);
            };

            // execute every 10 sec
            executor.scheduleWithFixedDelay(task, 0, 10, TimeUnit.SECONDS);
        });
    }

    public void save(List<Double> data) {
        // ...
    }
}

findAll调用返回大约 50 个传感器的列表,但是当我运行程序时,我看到虽然在第一个周期查询了所有传感器,但在后续执行中只调用了 2-3 个(例如 - 在 20 秒、30 秒等) . 我在想,由于某些传感器比其他传感器返回得更快,它们会更早地完成任务的等待周期,并被池中的下一个线程抓取,从而使其他完成速度较慢的任务饿死。

如何确保所有任务(传感器)都得到平等对待?这里有哪些最佳实践;我应该使用作业队列还是不同的并发机制?谢谢。

4

1 回答 1

1

在您的代码中有N=count service.findAll()计时器,这使得调试和测试更加困难。此外,不能保证旧任务会在合理的时间内被执行并且不会被新任务取代。如果你

  1. 使用单个计时器,在最后一次所有传感器检查完成后 10 秒触发传感器检查
  2. 计时器触发检查时同时通过传感器

请以下面的代码为例。它每 10 秒打印 50 个整数,然后 EOL。并行性是通过Stream API

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        IntStream.range(0, 50).parallel().forEach(i -> System.out.print(i + " "));
        System.out.println();
    }
}, 0, 10, TimeUnit.SECONDS);

您可以替换ScheduledExecutorServiceTimer使代码更清晰。并且,作为一个选项,您可以使用另一个ExecutorService,而不是使用并行流,向它提交下一个N任务Timer并等待它们完成:

ExecutorService workerExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        List<Future<Void>> futures = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            final int index = i;
            Future<Void> future = workerExecutor.submit(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    System.out.print(index + " ");
                    return null;
                }
            });
            futures.add(future);
        }
        for (Future<Void> future : futures) {
            try {
                future.get();
            } catch (InterruptedException|ExecutionException e) {
                throw new RuntimeException();
            }
        }
        System.out.println();
    }
}, 0, 10_000);
于 2017-02-05T07:49:22.497 回答