One way for executing code in a separate thread is to instantiate a Runnable
object,
pass it into a Thread
object and start it.
This makes the thread management tightly coupled together with the application.
The application has to create and starts a new Thread
object for every task, which is a costly operation.
This method uses the low-level Java API, which is sufficient for small applications.
For larger application, Java provides a more advanced solution, that is known as the Executors Framework.
Executor gives better scalability and multiprocessor utilization.
It decouples the thread management from the application.
The application only gets an executor object and passes the Runnable
object to it.
There are multiple executor implementations in the java.util.concurrent
package,
and most of them use thread pools.
Thread pools use worker threads and manages the whole life cycle of them.
It can reuse existing threads, and that reduces the thread creations.
It also manages the numbers of the threads and can create new ones if needed.
When a task is passed to an executor, maybe it is not executed immediately. Tasks are added to an internal queue, and the executor process this queue. If more tasks are sent to the executor as it can process, the tasks have to wait in the queue for a free thread. This improves the scalability. If as many thread were created as tasks and always started immediately, that could lead the application to crash.
Other important benefit of the executors is that they can return with a value after the execution.
java.util.concurrent
Package
The java.util.concurrent
package defines three executor interfaces:
- Executor
interface has only a single execute
method,
which takes a Runnable
object.
- ExecutorService
is a subinterface of the Executor
.
It has several methods for executing tasks and managing the life cycle both of the tasks and the executor.
- ScheduledExecutorService
is a subinterface of ExecutorService
, and
it provides methods for scheduling the tasks.
The java.util.concurrent.Executors
class has various factory methods for creating executors.
These methods return with an ExecutorService
or a ScheduledExecutorService
implementation.
Let’s see some examples, how to use the executors.
The Task
is a Runnable
class, which will be executed in the next examples.
It is very simple. It sleeps for one second, and after that prints out a done message like this:
11:42:02 pool-1-thread-1: task is done.
The sleep represents a long running operation. It needs one second to complete. The message contains a time and the thread name, these will be interesting in the outputs of the below examples.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
public class Task implements Runnable {
static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("HH:mm:ss");
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String time = dtf.format(LocalTime.now());
String name = Thread.currentThread().getName();
System.out.format("%s %s: task is done.%n", time, name);
}
}
The newSingleThreadExecutor
method of Executors
class creates an executor
that uses a single worker thread.
This executor is an implementation of the ExecutorService
interface.
The task
object is passed to the executor
three times by the execute
method.
As we saw above, the process time of a Task
is one second.
Now there is only one thread and three tasks, which means, the tasks are executed one after another,
so the total execution time is three seconds.
Beside the time, the output also shows that all tasks are executed by the same thread
because there is only one, the pool-1-thread-1
.
The shutdown
method initiates a shutdown for the executor, in which the previously submitted
tasks are executed, but no new tasks will be accepted.
If we try to execute another task after this, a RejectedExecutionException
will be thrown.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutor {
public static void main(String[] args) {
Runnable task = new Task();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.shutdown();
System.out.format("%s %s: done.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
}
}
The output of the above class:
The newFixedThreadPool
method of Executors
class
creates a thread pool that reuses a fixed number of threads.
The number of the threads can be defined in its input parameter.
In the below example, a thread pool is created with three threads, and the task is executed seven times. This means, three tasks can be executed at the same time. The total execution time is three seconds. In the first two seconds all threads are working and processing six tasks. After that, one task left, so in the third second only one thread is working.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPool {
public static void main(String[] args) {
Runnable task = new Task();
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.execute(task);
executor.shutdown();
System.out.format("%s %s: done.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
}
}
The output of the above class:
The newScheduledThreadPool
method of Executors
class creates a thread pool
that can schedule the tasks to run after a given delay or to execute periodically.
The number of the threads in the thread pool can be defined in its input parameter,
and it returns with a ScheduledExecutorService
implementation.
The next three examples present methods of this executor.
The schedule
method executes a task after a given delay.
It takes three parameters: a task, a delay and the time unit of the delay.
In the below example, the delays are four seconds.
The tasks are executed with a four seconds delay, and they process time is one second,
which means, the first task will be done after five seconds.
Now the scheduled thread pool created with two threads, and the tasks are scheduled five times. The execution starts after four seconds, but there are more tasks than threads, so some tasks have to wait for the execution.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPool {
public static void main(String[] args) {
Runnable task = new Task();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(task, 4, TimeUnit.SECONDS);
executor.schedule(task, 4, TimeUnit.SECONDS);
executor.schedule(task, 4, TimeUnit.SECONDS);
executor.schedule(task, 4, TimeUnit.SECONDS);
executor.schedule(task, 4, TimeUnit.SECONDS);
executor.shutdown();
System.out.format("%s %s: done.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
}
}
The output of the above class:
In the next example, the ScheduledExecutorService
implementation created by the
newSingleThreadScheduledExecutor
method, which returns with single threaded executor.
The scheduleAtFixedRate
method of the ScheduledExecutorService
executes a task periodically.
It has four parameters: a task, an initial delay time before the first execution,
a period between the executions, and finally the time unit of the initial delay and the period.
The execution will only end if the executor receives a shutdown request, or the task throws an exception.
In the below example, the period is three seconds, the initial delay is five seconds, the execution time of the task is one second. So the first period is done after 5+1 seconds. After this, the task is done in every three seconds. This means, this method doesn’t take into account the one second execution time of the task. If the execution of the task takes longer than its period, then the next executions may start late, but will not concurrently execute.
The awaitTermination
method waits twenty seconds for the executor to finish, but it won’t.
The shutdown
method initiates the shutdown of the executor,
but lets the previously received tasks to execute.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleAtFixedRate {
public static void main(String[] args) throws InterruptedException {
Runnable task = new Task();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(task, 5, 3, TimeUnit.SECONDS);
System.out.format("%s %s: The scheduled thread has been started.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
executor.awaitTermination(20, TimeUnit.SECONDS);
executor.shutdown();
System.out.format("%s %s: executor.shutdown() was called.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
}
}
The output of the above class:
The next example is almost the same as the previous, only the scheduleAtFixedRate
method replaced with
the scheduleWithFixedDelay
method, which takes into account the execution time of the task.
A new period starts after the previous task is done.
The period is three seconds, but the task needs one second to get done, so a task is done in every four seconds.
package com.programcodex.concurrency.executors;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleWithFixedDelay {
public static void main(String[] args) throws InterruptedException {
Runnable task = new Task();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleWithFixedDelay(task, 5, 3, TimeUnit.SECONDS);
System.out.format("%s %s: The scheduled thread has been started.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
executor.awaitTermination(20, TimeUnit.SECONDS);
executor.shutdown();
System.out.format("%s %s: executor.shutdown() was called.%n",
Task.dtf.format(LocalTime.now()),
Thread.currentThread().getName());
}
}
The output of the above class:
The above examples do not return with a value.
They executed a Runnable
class, which has a run
method, and
that method doesn’t have a return value.
If the thread needs to return with a value,
the Callable
interface has to be implemented instead of the Runnable
.
It has a call
method, which can return with an object.
The LengthOf
class implements the Callable
interface and
returns with an Integer
.
It receives a String
and returns with the length of that String
.
This is a long running operation because it needs three seconds to calculate the length.
Not like the run
method, the call
is allowed to throw Exception
,
so the sleep
method is not surrounded with a try/catch
block.
package com.programcodex.concurrency.executors.future;
import java.util.concurrent.Callable;
public class LengthOf implements Callable<Integer> {
private String str;
public LengthOf(String str) {
this.str = str;
}
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return str.length();
}
}
The ExecutorService
interface has also a submit
method,
which accepts a Callable
object and returns with a Future
.
This Future
will contains the result of the Callable
after the executor finished with it.
Future
provides a get
method, that returns with the result.
This method waits for the completion of the thread if necessary. Let’s see an example for this.
The LengthOf
class instantiated with the Hello word and submitted to the executor,
which returns with a Future
immediately.
The future.get()
tries to get the result, but the thread needs three second to calculate it,
so the main
method waits here until the thread gets done.
When it is done, the result is printed out.
The isDone
method of Future
returns with true
if the task is completed.
package com.programcodex.concurrency.executors.future;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ReturnFuture {
private static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("HH:mm:ss");
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new LengthOf("Hello"));
executor.shutdown();
print("future.isDone(): " + future.isDone());
Integer result;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
result = null;
}
print("The length of 'Hello' is " + result);
print("future.isDone(): " + future.isDone());
}
private static void print(String msg) {
System.out.println(dtf.format(LocalTime.now()) + " - " + msg);
}
}
The output of the above class:
Future
Timeout
The previous example is not very sophisticated.
If the thread never finishes, that makes the main
method to wait forever.
Future
provides another get
method, which takes a time parameter
and the time unit of the time parameter.
This method waits for the result at most the given time.
If the thread not finished before the time expires, a TimeoutException
is thrown.
The next example is very similar to the previous one, but the get
method only waits for two seconds.
The LengthOf
task needs three seconds to get done, so after two seconds
a TimeoutException
is thrown.
In the catch
block, the cancel
method tries to cancel execution of the task.
Our task is just running and interruptible, so this is done successfully.
After calling the cancel
method, the isDone
always returns with true
.
The isCancelled
method returns with true
if the cancellation was success.
package com.programcodex.concurrency.executors.future;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureTimeout {
private static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("HH:mm:ss");
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(new LengthOf("Hello"));
executor.shutdown();
print("future.isDone(): " + future.isDone());
print("future.isCancelled(): " + future.isCancelled());
try {
print("future.get(): " + future.get(2, TimeUnit.SECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
print("A " + e.getClass().getSimpleName() + " has been thrown.");
print("Is cancel of task success: " + future.cancel(true));
}
print("future.isDone(): " + future.isDone());
print("future.isCancelled(): " + future.isCancelled());
}
private static void print(String msg) {
System.out.println(dtf.format(LocalTime.now()) + " - " + msg);
}
}
The output of the above class: