Executors and Thread Pools
An Introduction with Examples in Java |
Prof. David Bernstein |
Computer Science Department |
bernstdh@jmu.edu |
sendMessage()
method in
the Dispatcher
is being executed in the
caller's thread of executionRealTimeDispatchHandler
and
the DailyDispatchHandler
sendMessage()
add tasks to a queue
and return "immediately"Executor
ObjectsThread
is being used, we can't`
call its interrupt()
or join()
methods)Executor
interface (i.e., create a subinterface)try { executor.shutdown(); executor.awaitTermination(WAIT, TimeUnit.SECONDS); } catch (InterruptedException ie) { } finally { if (!executor.isTerminated()) { } executor.shutdownNow(); }
import java.io.*; import java.util.*; /** * A RepeatingDirectoryLister will repeatedly save a directory listing * to the file system (in its own thread of execution). * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class RepeatingDirectoryLister implements Runnable { private volatile boolean keepRunning; private long delay; private Thread thread; /** * Explicit Value Constructor. * * @param delay The delay between snapshots */ public RepeatingDirectoryLister(long delay) { this.delay = delay; } /** * The code to execute. */ public void run() { File wd; File[] listing; PrintStream out; wd = new File("."); while (keepRunning) { // Create the PrintStream for the snapshot try { out = new PrintStream(new File("directory.snp")); } catch (FileNotFoundException fnfe) { out = System.out; } // Create the snapshot out.println("Contents of " + wd + " at " + new Date()); listing = wd.listFiles(); for (int i=0; i<listing.length; i++) { out.println(listing[i]); } out.close(); // Sleep until it is time for the next snapshot try { thread.sleep(delay); } catch (InterruptedException ie) { } } thread = null; } /** * Start this RepeatingDirectoryLister. */ public void start() { if (thread == null) { keepRunning = true; thread = new Thread(this); thread.start(); } } /** * Stop this RepeatingDirectoryLister. */ public void stop() { keepRunning = false; thread.interrupt(); } }
import java.io.*; /** * A utility application that repeatedly saves a snapshot of the files in the * current directory. * * This version uses a RepeatingDirectoryLister * @author Prof. David Bernstein, James Madison University * @version 1 */ public class AutoDir { /** * The entry point of the application. * * args[0] contains the delay between snapshots. * * @param args The command-line arguments. */ public static void main(String[] args) throws IOException { long delay; // Process the command-line arguments delay = 1000; if ((args != null) && (args.length > 0)) { delay = Long.parseLong(args[0]); } // Construct and start the RepeatingDirectoryLister RepeatingDirectoryLister rdl = new RepeatingDirectoryLister(delay); rdl.start(); // The command-line "user interface" (such as it is) BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Press [Enter] to stop."); in.readLine(); rdl.stop(); } }
import java.io.*; import java.io.*; import java.util.*; /** * A Runnable that will save a directory listing to the file system. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class DirectoryLister implements Runnable { /** * The code to execute. */ public void run() { File wd; File[] listing; PrintStream out; wd = new File("."); // Create the PrintStream for the snapshot try { out = new PrintStream(new File("directory.snp")); } catch (FileNotFoundException fnfe) { out = System.out; } // Print the header out.println("Contents of " + wd + " at " + new Date()); // Create the snapshot listing = wd.listFiles(); for (int i=0; i<listing.length; i++) { out.println(listing[i]); } out.close(); } }
ScheduledExecutorService
import java.io.*; import java.util.concurrent.*; /** * A utility application that repeatedly saves a snapshot of the files in the * current directory. * * This version uses a DirectoryLister and a ScheduledExecutorService. * @author Prof. David Bernstein, James Madison University * @version 2 */ public class AutoDir { /** * The entry point of the application. * * args[0] contains the delay between snapshots. * * @param args The command-line arguments. */ public static void main(String[] args) throws IOException { long delay; // Process the command-line arguments delay = 1000; if ((args != null) && (args.length > 0)) { delay = Long.parseLong(args[0]); } // Construct and start the DirectoryLister ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(new DirectoryLister(), 0, delay, TimeUnit.MILLISECONDS); // The command-line "user interface" (such as it is) BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Press [Enter] to stop."); in.readLine(); scheduler.shutdownNow(); } }
Runnable
is void
, the
tasks can't return anything (except indirectly through reference
parameters that are passed in)get()
- wait for the task to complete
(perhaps for a maximum amount of time)isDone()
- determine if the task is completecancel()
- attempt to cancel the taskisCancelled()
- determine if the task was cancelled
before completionFuture
can be used like a
latch
import java.io.*; import java.util.*; import java.util.concurrent.*; /** * A Runnable that will save a directory listing to the file system. * * This version implements the Callable interface so that it can return * the number of files in the directory. * * @author Prof. David Bernstein, James Madison University * @version 3.0 */ public class DirectoryLister implements Callable<Integer> { /** * The code to execute. */ public Integer call() { File wd; File[] listing; PrintStream out; wd = new File("."); // Create the PrintStream for the snapshot try { out = new PrintStream(new File("directory.snp")); } catch (FileNotFoundException fnfe) { out = System.out; } // Print the header out.println("Contents of " + wd + " at " + new Date()); // Create the snapshot listing = wd.listFiles(); for (int i=0; i<listing.length; i++) { out.println(listing[i]); } out.close(); return new Integer(listing.length); } }
import java.io.*; import java.util.concurrent.*; /** * A utility application that repeatedly saves a snapshot of the files in the * current directory. * * This version uses schules several DirectoryLister objects and waits for * them to complete. Also, this version can't be terminated. * * @author Prof. David Bernstein, James Madison University * @version 3 */ public class AutoDir { /** * The entry point of the application. * * args[0] contains the delay between snapshots. * * @param args The command-line arguments. */ public static void main(String[] args) throws IOException { int n; long delay; // Process the command-line arguments delay = 60000; // Every sixty seconds n = 60; // For an hour if ((args != null) && (args.length > 1)) { delay = Long.parseLong(args[0]); n = Integer.parseInt(args[1]); } // Create the ExecutorService ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // Construct and schedule multiple DirectoryLister objects ScheduledFuture<Integer>[] result = new ScheduledFuture[n]; for (int i=0; i<n; i++) { result[i] = scheduler.schedule(new DirectoryLister(), i*delay, TimeUnit.MILLISECONDS); } // Wait for and then print the number of files in the directory for (int i=0; i<result.length; i++) { try { System.out.printf("Number of Files: %d\n", result[i].get().intValue()); } catch (ExecutionException ie) { System.out.println("Number of Files: Unavailable\n"); } catch (InterruptedException ie) { System.out.println("Number of Files: Unavailable\n"); } } // Shutdown the ExecutorService scheduler.shutdownNow(); } }
Future
Future
object can be useful even when
the task does not return anything since it provides other
functionalityFuture
object has a cancel()
methodFuture
object can be parameterized
using ?
to indicate that we don't
care about the return type
Terminator
Taskimport java.util.concurrent.*; /** * A Runnable that can be used to cancel a Future and shutdown the * ExecutorService. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class Terminator implements Runnable { ExecutorService service; Future<?> future; /** * Explicit Value Constructor. * * @param future The Future to be cancelled. * @param service The ExecutorService */ public Terminator(Future<?> future, ExecutorService service) { this.future = future; this.service = service; } /** * The code to execute. */ public void run() { // Cancel the Future future.cancel(true); // Shutdown the ExecutorService try { service.shutdown(); service.awaitTermination(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { } finally { service.shutdownNow(); } } }
Terminator
Taskimport java.io.*; import java.util.concurrent.*; /** * A utility application that repeatedly saves a snapshot of the files in the * current directory. * * This version runs for a specific amount if time; it does not have * a UI that can be used to shut it down. * @author Prof. David Bernstein, James Madison University * @version 4 */ public class AutoDir { /** * The entry point of the application. * * args[0] contains the delay between snapshots. * * @param args The command-line arguments. */ public static void main(String[] args) throws IOException { long delay, duration; // Process the command-line arguments delay = 60000; // Once per minute duration = 3600000; // For an hour if ((args != null) && (args.length > 1)) { delay = Long.parseLong(args[0]); duration = Long.parseLong(args[1]); } // Construct the scheduler ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // Construct and schedule the DirectoryLister Future<?> task; task = scheduler.scheduleAtFixedRate(new DirectoryLister(), 0, delay, TimeUnit.MILLISECONDS); // Construct and schedule the Terminator scheduler.schedule(new Terminator(task, scheduler), duration, TimeUnit.MILLISECONDS); } }
Executor
and ExecutorService
:
Runnable
to an Executor
happen-before its execution beginsCallable
to an ExecutorService
happen-before its execution beginsFuture
:
Future
happen-before actions subsequent
to the retrieval of the result via Future.get()
in another thread