Intro
In real world, we must wait for sub-tasks to finish before we can proceed. In Java, there are many ways to implement for such case.
Thread.join()
When we call the method thread.join()
, the main thread will wait until the thread completed. Code as below:
private static void threadJoin() {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < NUM; i++) {
Thread t = new Thread(new PkslowTask("Task " + i));
t.start();
threads.add(t);
}
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("threadJoin Finished All Tasks...");
}
Output:
Task 6 is running
Task 9 is running
Task 3 is running
Task 4 is running
Task 7 is running
Task 0 is running
Task 2 is running
Task 1 is running
Task 5 is running
Task 8 is running
Task 1 is completed
Task 8 is completed
Task 6 is completed
Task 4 is completed
Task 3 is completed
Task 0 is completed
Task 7 is completed
Task 9 is completed
Task 2 is completed
Task 5 is completed
threadJoin Finished All Tasks...
CountDownLatch
CountDownLatch is a good tool for cocurrency. We need to pass the num to create new instance. If the sub-thread call countDown()
, the num will be num-1
. The await()
method will block until the num reach to 0. Code as below:
private static void countDownLatch() {
CountDownLatch latch = new CountDownLatch(NUM);
for (int i = 0; i < NUM; i++) {
Thread t = new Thread(() -> {
System.out.println("countDownLatch running...");
try {
Thread.sleep(1000);
System.out.println("countDownLatch Finished...");
latch.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("countDownLatch Finished All Tasks...");
}
Output:
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished All Tasks...
CyclicBarrier
CyclicBarrier is similar to CountDownLatch, but we can reset and reuse the CyclicBarrier:
private static void cyclicBarrier() {
CyclicBarrier barrier = new CyclicBarrier(NUM + 1);
for (int i = 0; i < NUM; i++) {
Thread t = new Thread(() -> {
System.out.println("cyclicBarrier running...");
try {
Thread.sleep(1000);
System.out.println("cyclicBarrier Finished...");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
t.start();
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
System.out.println("cyclicBarrier Finished All Tasks...");
}
Output:
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished All Tasks...
executorService.isTerminated()
Calling isTerminated()
right after shutdown()
, we can know if all the tasks in Executor are completed:
private static void executeServiceIsTerminated() {
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
IntStream.range(0, NUM)
.forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
executorService.shutdown();
while (!executorService.isTerminated()) {
//waiting...
}
System.out.println("executeServiceIsTerminated Finished All Tasks...");
}
Output:
Task 0 is running
Task 2 is running
Task 1 is running
Task 3 is running
Task 4 is running
Task 0 is completed
Task 2 is completed
Task 5 is running
Task 4 is completed
Task 7 is running
Task 3 is completed
Task 1 is completed
Task 8 is running
Task 6 is running
Task 9 is running
Task 5 is completed
Task 9 is completed
Task 7 is completed
Task 6 is completed
Task 8 is completed
executeServiceIsTerminated Finished All Tasks...
executorService.awaitTermination
executorService.awaitTermination
can wait the ExecutorService to shutdown with a timeout:
private static void executeServiceAwaitTermination() {
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
IntStream.range(0, NUM)
.forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
executorService.shutdown();
try {
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("executeServiceAwaitTermination Finished All Tasks...");
}
Output:
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 0 is completed
Task 5 is running
Task 1 is completed
Task 4 is completed
Task 7 is running
Task 3 is completed
Task 8 is running
Task 2 is completed
Task 9 is running
Task 6 is running
Task 5 is completed
Task 7 is completed
Task 9 is completed
Task 8 is completed
Task 6 is completed
executeServiceAwaitTermination Finished All Tasks...
executorService.invokeAll
Use the invokeAll to submit the tasks:
private static void executeServiceInvokeAll() {
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
List<Callable<Void>> tasks = new ArrayList<>();
IntStream.range(0, NUM)
.forEach(i -> tasks.add(new PkslowTask("Task " + i)));
try {
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
System.out.println("executeServiceInvokeAll Finished All Tasks...");
}
Output:
Task 1 is running
Task 2 is running
Task 0 is running
Task 3 is running
Task 4 is running
Task 1 is completed
Task 3 is completed
Task 0 is completed
Task 2 is completed
Task 4 is completed
Task 8 is running
Task 5 is running
Task 6 is running
Task 9 is running
Task 7 is running
Task 8 is completed
Task 5 is completed
Task 6 is completed
Task 9 is completed
Task 7 is completed
executeServiceInvokeAll Finished All Tasks...
ExecutorCompletionService
ExecutorCompletionService can fetch the ealiest completed task with take()
:
private static void executorCompletionService() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletionService<String> service = new ExecutorCompletionService<>(executorService);
List<Callable<String>> callables = new ArrayList<>();
callables.add(new DelayedCallable(2000, "2000ms"));
callables.add(new DelayedCallable(1500, "1500ms"));
callables.add(new DelayedCallable(6000, "6000ms"));
callables.add(new DelayedCallable(2500, "2500ms"));
callables.add(new DelayedCallable(300, "300ms"));
callables.add(new DelayedCallable(3000, "3000ms"));
callables.add(new DelayedCallable(1100, "1100ms"));
callables.add(new DelayedCallable(100, "100ms"));
callables.add(new DelayedCallable(100, "100ms"));
callables.add(new DelayedCallable(100, "100ms"));
callables.forEach(service::submit);
for (int i = 0; i < NUM; i++) {
try {
Future<String> future = service.take();
System.out.println(future.get() + " task is completed");
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
System.out.println("executorCompletionService Finished All Tasks...");
executorService.shutdown();
awaitTerminationAfterShutdown(executorService);
}
The tasks complted time is different, first completed first return:
2000ms is running
2500ms is running
300ms is running
1500ms is running
6000ms is running
3000ms is running
1100ms is running
100ms is running
100ms is running
100ms is running
100ms is completed
100ms is completed
100ms task is completed
100ms task is completed
100ms is completed
100ms task is completed
300ms is completed
300ms task is completed
1100ms is completed
1100ms task is completed
1500ms is completed
1500ms task is completed
2000ms is completed
2000ms task is completed
2500ms is completed
2500ms task is completed
3000ms is completed
3000ms task is completed
6000ms is completed
6000ms task is completed
executorCompletionService Finished All Tasks...
Code
Code in GitHub: https://github.com/LarryDpk/pkslow-samples
References:
https://stackoverflow.com/questions/4691533/java-wait-for-thread-to-finish
https://ducmanhphan.github.io/2020-03-20-Waiting-threads-to-finish-completely-in-Java/