文章目录
- 很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等。 工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的返回值,然后使用 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 实现“最多等多久”的效果。 但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。 比如,java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现? 如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。 最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?
-
- 下面给出一个简单的模拟案例: package basic.thread;import java.util.concurrent.*;public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(2); Future<?> future = executorService.submit(() -> { try { demo(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); String threadName = Thread.currentThread().getName(); System.out.println(threadName + "获取的结果 -- start"); Object result = future.get(100, TimeUnit.MILLISECONDS); System.out.println(threadName + "获取的结果 -- end :" + result); } private static String demo() throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.println(threadName + ",执行 demo -- start"); TimeUnit.SECONDS.sleep(1); System.out.println(threadName + ",执行 demo -- end"); return "test"; }} 输出结果: main获取的结果 -- startpool-1-thread-1,执行 demo -- startException in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at basic.thread.FutureDemo.main(FutureDemo.java:20)pool-1-thread-1,执行 demo -- end 我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程“却”继续执行完毕。
- 我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。
/** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when {@code cancel} is called, * this task should never run. If the task has already started, * then the {@code mayInterruptIfRunning} parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */ boolean cancel(boolean mayInterruptIfRunning);
看源码注释我们可以知道:
当设置为 true 时,正在执行的任务将被中断(interrupted);
当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。
- 此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。 package basic.thread;import org.junit.platform.commons.util.ExceptionUtils;import java.util.concurrent.*;public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(2); Future<?> future = executorService.submit(() -> { try { demo(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start"); try { Object result = future.get(100, TimeUnit.MILLISECONDS); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result); } catch (Exception e) { System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e)); } future.cancel(false); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel"); } private static String demo() throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start"); TimeUnit.SECONDS.sleep(1); System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end"); return "test"; }} 结果: 1653751759233,main获取的结果 -- start1653751759233,pool-1-thread-1,执行 demo -- start1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at basic.thread.FutureDemo.main(FutureDemo.java:23)1653751759351,main获取的结果 -- cancel1653751760263,pool-1-thread-1,执行 demo -- end 我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。
- package basic.thread;import org.junit.platform.commons.util.ExceptionUtils;import java.util.concurrent.*;public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(2); Future<?> future = executorService.submit(() -> { try { demo(); } catch (InterruptedException e) { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e)); throw new RuntimeException(e); } }); String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start"); try { Object result = future.get(100, TimeUnit.MILLISECONDS); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result); } catch (Exception e) { System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e)); } future.cancel(true); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel"); } private static String demo() throws InterruptedException { String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start"); TimeUnit.SECONDS.sleep(1); System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end"); return "test"; }} 执行结果: 1653752011246,main获取的结果 -- start1653752011246,pool-1-thread-1,执行 demo -- start1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at basic.thread.FutureDemo.main(FutureDemo.java:24)1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at basic.thread.FutureDemo.demo(FutureDemo.java:36) at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)1653752011366,main获取的结果 -- cancel 可以看出,此时,如果目标线程未执行完,那么会收到 InterruptedException ,被中断。 当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。 package basic.thread;import org.junit.platform.commons.util.ExceptionUtils;import java.util.concurrent.*;public class FutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(2); Future<?> future = executorService.submit(() -> { demo(); }); String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start"); try { Object result = future.get(100, TimeUnit.MILLISECONDS); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result); } catch (Exception e) { System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e)); } future.cancel(true); System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel"); } private static String demo() { String threadName = Thread.currentThread().getName(); System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级"); } System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end"); return "test"; }} 执行结果: 1653752219803,main获取的结果 -- start1653752219803,pool-1-thread-1,执行 demo -- start1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at basic.thread.FutureDemo.main(FutureDemo.java:19)1653752219913,main获取的结果 -- cancel1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级1653752219914,pool-1-thread-1,执行 demo -- end
- 我们直接看 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 的源码注释,就可以清楚地知道各种情况的表现: /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 我们还可以选取几个常见的实现类,查看下实现的基本思路: java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit) /** * Waits if necessary for at most the given time for this future * to complete, and then returns its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object r; long nanos = unit.toNanos(timeout); return reportGet((r = result) == null ? timedGet(nanos) : r); } /** * Returns raw result after waiting, or null if interrupted, or * throws TimeoutException on timeout. */ private Object timedGet(long nanos) throws TimeoutException { if (Thread.interrupted()) return null; if (nanos <= 0L) throw new TimeoutException(); long d = System.nanoTime() + nanos; Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 boolean queued = false; Object r; // We intentionally don't spin here (as waitingGet does) because // the call to nanoTime() above acts much like a spin. while ((r = result) == null) { if (!queued) queued = tryPushStack(q); else if (q.interruptControl < 0 || q.nanos <= 0L) { q.thread = null; cleanStack(); if (q.interruptControl < 0) return null; throw new TimeoutException(); } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } if (q.interruptControl < 0) r = null; q.thread = null; postComplete(); return r; } java.util.concurrent.Future#cancel 也一样 /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when {@code cancel} is called, * this task should never run. If the task has already started, * then the {@code mayInterruptIfRunning} parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */boolean cancel(boolean mayInterruptIfRunning); java.util.concurrent.FutureTask#cancel public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } 可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法 java.util.concurrent.CompletableFuture#cancel /** * If not already completed, completes this CompletableFuture with * a {@link CancellationException}. Dependent CompletableFutures * that have not already completed will also complete * exceptionally, with a {@link CompletionException} caused by * this {@code CancellationException}. * * @param mayInterruptIfRunning this value has no effect in this * implementation because interrupts are not used to control * processing. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = (result == null) && internalComplete(new AltResult(new CancellationException())); postComplete(); return cancelled || isCancelled(); } 通过注释我们也发现,不同的实现类对参数的“效果”也有差异。
- 我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。 创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等。
工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的返回值,然后使用 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 实现“最多等多久”的效果。
但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。

比如,java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?
如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。
最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?
下面给出一个简单的模拟案例:
package basic.thread;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
try {
demo();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String threadName = Thread.currentThread().getName();
System.out.println(threadName + "获取的结果 -- start");
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(threadName + "获取的结果 -- end :" + result);
}
private static String demo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(threadName + ",执行 demo -- end");
return "test";
}
}
输出结果:
main获取的结果 -- start
pool-1-thread-1,执行 demo -- start
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:20)
pool-1-thread-1,执行 demo -- end
我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程“却”继续执行完毕。
我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
看源码注释我们可以知道:
当设置为 true 时,正在执行的任务将被中断(interrupted);
当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。
此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。
package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
try {
demo();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try {
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
} catch (Exception e) {
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
}
future.cancel(false);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
}
private static String demo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";
}
}
结果:
1653751759233,main获取的结果 -- start
1653751759233,pool-1-thread-1,执行 demo -- start
1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:23)
1653751759351,main获取的结果 -- cancel
1653751760263,pool-1-thread-1,执行 demo -- end
我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。
package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
try {
demo();
} catch (InterruptedException e) {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));
throw new RuntimeException(e);
}
});
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try {
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
} catch (Exception e) {
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
}
future.cancel(true);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
}
private static String demo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";
}
}
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
try {
demo();
} catch (InterruptedException e) {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));
throw new RuntimeException(e);
}
});
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try {
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
} catch (Exception e) {
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
}
future.cancel(true);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
}
private static String demo() throws InterruptedException {
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
TimeUnit.SECONDS.sleep(1);
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";
}
}
执行结果:
1653752011246,main获取的结果 -- start
1653752011246,pool-1-thread-1,执行 demo -- start
1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:24)
1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at basic.thread.FutureDemo.demo(FutureDemo.java:36)
at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
1653752011366,main获取的结果 -- cancel
可以看出,此时,如果目标线程未执行完,那么会收到 InterruptedException ,被中断。
当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。
package basic.thread;
import org.junit.platform.commons.util.ExceptionUtils;
import java.util.concurrent.*;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
demo();
});
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");
try {
Object result = future.get(100, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);
} catch (Exception e) {
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));
}
future.cancel(true);
System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");
}
private static String demo() {
String threadName = Thread.currentThread().getName();
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级");
}
System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");
return "test";
}
}
执行结果:
1653752219803,main获取的结果 -- start
1653752219803,pool-1-thread-1,执行 demo -- start
1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at basic.thread.FutureDemo.main(FutureDemo.java:19)
1653752219913,main获取的结果 -- cancel
1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级
1653752219914,pool-1-thread-1,执行 demo -- end
我们直接看 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 的源码注释,就可以清楚地知道各种情况的表现:
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
我们还可以选取几个常见的实现类,查看下实现的基本思路:
java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit)
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)
/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
java.util.concurrent.Future#cancel 也一样
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
java.util.concurrent.FutureTask#cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法
java.util.concurrent.CompletableFuture#cancel
/**
* If not already completed, completes this CompletableFuture with
* a {@link CancellationException}. Dependent CompletableFutures
* that have not already completed will also complete
* exceptionally, with a {@link CompletionException} caused by
* this {@code CancellationException}.
*
* @param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = (result == null) &&
internalComplete(new AltResult(new CancellationException()));
postComplete();
return cancelled || isCancelled();
}
通过注释我们也发现,不同的实现类对参数的“效果”也有差异。
我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。
创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
