0

我有 5000 个类似的 Callable 任务要在 Executors.newFixedThreadPool(8) 创建的 ExecutorService 的 8 个线程中执行。每个任务都进入数据库以检索大量数据以进行处理。

一切正常 99% 的时间,但有时我在日志文件中看到非常奇怪的执行日志消息,当 DB 很慢或卡住(不要问为什么)并且 8 个当前正在运行的任务在所有 8 个线程中都停止并且尚未完成, ExecutorService 开始提交更多的任务来一一执行!

所以日志显示,在某些时候 ExecutorService 发疯了,开始调用 Callable 的 call() 方法,等待队列中越来越多的任务没有等待前面的任务完成。越来越多的任务向 DB 发送请求,最终导致 DB 崩溃,Java 堆内存耗尽。

看起来 ExecutorService 内部发生了一些奇怪的事情,或者我对这种情况的理解是错误的。有没有人见过这样的东西?

我的大脑堆栈溢出了

ps 是来自 Java API 的引用:

Executors.newFixedThreadPool(int nThreads)

创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多 nThreads 个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,新的线程将取代它

如果我的任务导致线程死亡并且 ExecutorService 创建更多线程并向它们提交新的 8 个任务并且它们死亡并且 ExecutorService 创建更多线程并提交更多 8 个任务,这是否真的会发生?

pss:Callable 的 call() 内部的整个操作都被 try catch 包围,因此如果我的操作内部发生任何异常,则会捕获并记录该异常。这一切都没有发生。该调用被调用并且永远不会返回,而下一个任务被一个一个调用并且永远不会返回并且永远不会完成并且永远不会抛出任何异常。

我怀疑我的任务导致线程池中的线程死亡。怎么可能模仿?

4

2 回答 2

3

我也会尝试猜测:

  1. 您提交了 5000 个任务,这些任务涉及从数据库中获取数据。
  2. 不久之后,您会在所需的行/表上遇到严重的锁争用。也许外部进程正在获取用于写入的独占锁。也许有一个僵局。
  3. 一个接一个地,任务阻塞,等待共享/读取锁被授予。
  4. 似乎所有 8 个线程都已挂起,正在等待I/O
  5. 不久之后,数据库/数据库驱动程序注意到任务等待共享锁的时间过长。Lock Wait Timeout它按顺序简要地分发任务例外。
  6. 因此,一个接一个地,任务从队列中失败,等待的任务被推入执行,只是再次失败。

请注意,任务中的异常不会停止ExecutorService. 它只会将该任务标记为已完成并继续。

看这个例子:

public class Foo {

    static class Task implements Callable<String> {
        private static AtomicInteger i = new AtomicInteger(1);

        public String call() throws Exception {
            i.incrementAndGet();
            if (i.get() % 2 != 0) {
                throw new RuntimeException("That's odd, I failed.");
            }
            return "I'm done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(2);
        List<Future<String>> futures = new ArrayList<Future<String>>();
        for (int i = 0; i < 5; i++) {
            futures.add(es.submit(new Task()));
        }
        for (Future<String> future : futures) {
            try {
                System.out.println(future.get());
            } catch (ExecutionException ee) {
                System.err.println(ee.getCause());
            }
        }
        es.shutdown();
    }
}

可能的输出:

I'm done
I'm done
I'm done
java.lang.RuntimeException: That's odd, I failed.
java.lang.RuntimeException: That's odd, I failed.
于 2011-11-03T05:48:02.950 回答
0

这只是一个猜测,(我认为值得猜测,因为问题中缺少代码):

ExecutorService.invokeAll(Collection<? extends Callable<T>> tasks)如果当前任务抛出异常,将继续执行其他任务。(你在使用invokeAll()吗?我认为submit(Callable<T> task)有相同的行为,但从javadoc中并不清楚)

Future.isDone()你能在后续任务开始运行之前检查那些“卡住”的任务吗?潜在的异常被抛出并且在日志中看不到......

从javadoc:

请注意,已完成的任务可能已经正常终止,也可能通过引发异常终止。

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection%29

如果这种情况,您可以在Callable.call()方法定义中捕获并记录所有异常。

高温高压

于 2011-11-03T04:54:52.213 回答