ExecutorService如果您曾经启动线程,则通常不能从外部代码中断ExecutorService::execute(Runnable)线程,因为外部代码没有Thread对每个正在运行的线程的对象的引用(尽管请参阅此答案的结尾以获取解决方案,如果你需要ExecutorService::execute)。但是,如果您改为使用ExecutorService::submit(Callable<T>)提交作业,则会返回 a ,它在开始执行Future<T>后在内部保留对正在运行的线程的引用。Callable::call()这个线程可以通过调用来中断Future::cancel(true)。因此,任何检查当前线程中断状态的代码(或由其调用)Callable都可以通过Future引用中断。这包括BlockingQueue::take(),即使被阻塞,也会响应线程中断。(如果 JRE 阻塞方法在阻塞时被中断,通常会唤醒,意识到它们已被中断,并抛出一个InterruptedException.)
总结一下:既取消未来的工作Future::cancel(),同时也中断正在进行的工作(只要正在进行的工作响应线程中断)。这两个调用都不会影响已经成功完成的工作。Future::cancel(true)Future::cancel(true)cancel
请注意,一旦线程被取消中断,InterruptException就会在线程内抛出一个(例如,BlockingQueue::take()在这种情况下)。但是,下次您调用成功取消(即在完成之前被取消的 a)时,您的 aCancellationException将被扔回主线程。这与您通常期望的不同:如果未取消的throws ,下一次调用将 throw ,但如果取消的throws ,下一次调用将通过。Future::get()FutureFutureCallableInterruptedExceptionFuture::get()InterruptedExceptionCallableInterruptedExceptionFuture::get()CancellationException
这是一个说明这一点的例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws Exception {
// Start Executor with 4 threads
int numThreads = 4;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
try {
// Set up BlockingQueue for inputs, and List<Future> for outputs
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
List<Future<String>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
int threadIdx = i;
futures.add(executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
// Get an input from the queue (blocking)
int val = queue.take();
return "Thread " + threadIdx + " got value " + val;
} catch (InterruptedException e) {
// Thrown once Future::cancel(true) is called
System.out.println("Thread " + threadIdx + " got interrupted");
// This value is returned to the Future, but can never
// be read, since the caller will get a CancellationException
return "Thread " + threadIdx + " got no value";
}
}
}));
}
// Enqueue (numThreads - 1) values into the queue, so that one thread blocks
for (int i = 0; i < numThreads - 1; i++) {
queue.add(100 + i);
}
// Cancel all futures
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
// Cancel the Future -- this doesn't throw an exception until
// the get() method is called
future.cancel(/* mayInterruptIfRunning = */ true);
try {
System.out.println(future.get());
} catch (CancellationException e) {
System.out.println("Future " + i + " was cancelled");
}
}
} finally {
// Terminate main after all threads have shut down (this call does not block,
// so main will exit before the threads stop running)
executor.shutdown();
}
}
}
每次运行时,输出都会有所不同,但这里有一次运行:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted
这表明Future::cancel()调用了之前完成的线程 2 和线程 3。线程 1 被取消,所以内部InterruptedException被抛出,外部CancellationException被抛出。线程 0 在开始运行之前被取消。(请注意,线程索引通常与索引不相关Future,因此Future 0 was cancelled可能对应于线程 0 或线程 1 被取消,对于Future 1 was cancelled.)
高级:实现相同效果的一种方法 with Executor::execute(不返回Future引用)而不是使用 customExecutor::submit创建 a ,并为创建的每个线程在并发集合(例如并发队列)中记录一个引用。然后要取消所有线程,您可以简单地调用所有先前创建的线程。但是,您将需要处理在中断现有线程时可能会创建新线程的竞争条件。要处理此问题,请设置一个对可见的标志,告诉它不要再创建任何线程,然后一旦设置,取消现有线程。ThreadPoolExecutorThreadFactoryThreadFactoryThread::interrupt()AtomicBooleanThreadFactory