|
Executors and Thread Pools
An Introduction with Examples in Java |
|
Prof. David Bernstein |
| Computer Science Department |
| bernstdh@jmu.edu |
sendMessage() method in
the Dispatcher is being executed in the
caller's thread of executionRealTimeDispatchHandler and
the DailyDispatchHandler
sendMessage() add tasks to a queue
and return "immediately"Executor ObjectsThread is being used, we can't`
call its interrupt() or join()
methods)Executor
interface (i.e., create a subinterface)
try
{
executor.shutdown();
executor.awaitTermination(WAIT, TimeUnit.SECONDS);
}
catch (InterruptedException ie)
{
}
finally
{
if (!executor.isTerminated())
{
}
executor.shutdownNow();
}
import java.io.*;
import java.util.*;
/**
* A RepeatingDirectoryLister will repeatedly save a directory listing
* to the file system (in its own thread of execution).
*
* @author Prof. David Bernstein, James Madison University
* @version 1.0
*/
public class RepeatingDirectoryLister implements Runnable
{
private volatile boolean keepRunning;
private long delay;
private Thread thread;
/**
* Explicit Value Constructor.
*
* @param delay The delay between snapshots
*/
public RepeatingDirectoryLister(long delay)
{
this.delay = delay;
}
/**
* The code to execute.
*/
public void run()
{
File wd;
File[] listing;
PrintStream out;
wd = new File(".");
while (keepRunning)
{
// Create the PrintStream for the snapshot
try
{
out = new PrintStream(new File("directory.snp"));
}
catch (FileNotFoundException fnfe)
{
out = System.out;
}
// Create the snapshot
out.println("Contents of " + wd + " at " + new Date());
listing = wd.listFiles();
for (int i=0; i<listing.length; i++)
{
out.println(listing[i]);
}
out.close();
// Sleep until it is time for the next snapshot
try
{
thread.sleep(delay);
}
catch (InterruptedException ie)
{
}
}
thread = null;
}
/**
* Start this RepeatingDirectoryLister.
*/
public void start()
{
if (thread == null)
{
keepRunning = true;
thread = new Thread(this);
thread.start();
}
}
/**
* Stop this RepeatingDirectoryLister.
*/
public void stop()
{
keepRunning = false;
thread.interrupt();
}
}
import java.io.*;
/**
* A utility application that repeatedly saves a snapshot of the files in the
* current directory.
*
* This version uses a RepeatingDirectoryLister
* @author Prof. David Bernstein, James Madison University
* @version 1
*/
public class AutoDir
{
/**
* The entry point of the application.
*
* args[0] contains the delay between snapshots.
*
* @param args The command-line arguments.
*/
public static void main(String[] args) throws IOException
{
long delay;
// Process the command-line arguments
delay = 1000;
if ((args != null) && (args.length > 0))
{
delay = Long.parseLong(args[0]);
}
// Construct and start the RepeatingDirectoryLister
RepeatingDirectoryLister rdl = new RepeatingDirectoryLister(delay);
rdl.start();
// The command-line "user interface" (such as it is)
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
System.out.println("Press [Enter] to stop.");
in.readLine();
rdl.stop();
}
}
import java.io.*;
import java.io.*;
import java.util.*;
/**
* A Runnable that will save a directory listing to the file system.
*
* @author Prof. David Bernstein, James Madison University
* @version 1.0
*/
public class DirectoryLister implements Runnable
{
/**
* The code to execute.
*/
public void run()
{
File wd;
File[] listing;
PrintStream out;
wd = new File(".");
// Create the PrintStream for the snapshot
try
{
out = new PrintStream(new File("directory.snp"));
}
catch (FileNotFoundException fnfe)
{
out = System.out;
}
// Print the header
out.println("Contents of " + wd + " at " + new Date());
// Create the snapshot
listing = wd.listFiles();
for (int i=0; i<listing.length; i++)
{
out.println(listing[i]);
}
out.close();
}
}
ScheduledExecutorService
import java.io.*;
import java.util.concurrent.*;
/**
* A utility application that repeatedly saves a snapshot of the files in the
* current directory.
*
* This version uses a DirectoryLister and a ScheduledExecutorService.
* @author Prof. David Bernstein, James Madison University
* @version 2
*/
public class AutoDir
{
/**
* The entry point of the application.
*
* args[0] contains the delay between snapshots.
*
* @param args The command-line arguments.
*/
public static void main(String[] args) throws IOException
{
long delay;
// Process the command-line arguments
delay = 1000;
if ((args != null) && (args.length > 0))
{
delay = Long.parseLong(args[0]);
}
// Construct and start the DirectoryLister
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(new DirectoryLister(), 0, delay,
TimeUnit.MILLISECONDS);
// The command-line "user interface" (such as it is)
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
System.out.println("Press [Enter] to stop.");
in.readLine();
scheduler.shutdownNow();
}
}
Runnable is void, the
tasks can't return anything (except indirectly through reference
parameters that are passed in)get() - wait for the task to complete
(perhaps for a maximum amount of time)isDone() - determine if the task is completecancel() - attempt to cancel the taskisCancelled() - determine if the task was cancelled
before completionFuture can be used like a
latch
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
/**
* A Runnable that will save a directory listing to the file system.
*
* This version implements the Callable interface so that it can return
* the number of files in the directory.
*
* @author Prof. David Bernstein, James Madison University
* @version 3.0
*/
public class DirectoryLister implements Callable<Integer>
{
/**
* The code to execute.
*/
public Integer call()
{
File wd;
File[] listing;
PrintStream out;
wd = new File(".");
// Create the PrintStream for the snapshot
try
{
out = new PrintStream(new File("directory.snp"));
}
catch (FileNotFoundException fnfe)
{
out = System.out;
}
// Print the header
out.println("Contents of " + wd + " at " + new Date());
// Create the snapshot
listing = wd.listFiles();
for (int i=0; i<listing.length; i++)
{
out.println(listing[i]);
}
out.close();
return new Integer(listing.length);
}
}
import java.io.*;
import java.util.concurrent.*;
/**
* A utility application that repeatedly saves a snapshot of the files in the
* current directory.
*
* This version uses schules several DirectoryLister objects and waits for
* them to complete. Also, this version can't be terminated.
*
* @author Prof. David Bernstein, James Madison University
* @version 3
*/
public class AutoDir
{
/**
* The entry point of the application.
*
* args[0] contains the delay between snapshots.
*
* @param args The command-line arguments.
*/
public static void main(String[] args) throws IOException
{
int n;
long delay;
// Process the command-line arguments
delay = 60000; // Every sixty seconds
n = 60; // For an hour
if ((args != null) && (args.length > 1))
{
delay = Long.parseLong(args[0]);
n = Integer.parseInt(args[1]);
}
// Create the ExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// Construct and schedule multiple DirectoryLister objects
ScheduledFuture<Integer>[] result = new ScheduledFuture[n];
for (int i=0; i<n; i++)
{
result[i] = scheduler.schedule(new DirectoryLister(), i*delay,
TimeUnit.MILLISECONDS);
}
// Wait for and then print the number of files in the directory
for (int i=0; i<result.length; i++)
{
try
{
System.out.printf("Number of Files: %d\n",
result[i].get().intValue());
}
catch (ExecutionException ie)
{
System.out.println("Number of Files: Unavailable\n");
}
catch (InterruptedException ie)
{
System.out.println("Number of Files: Unavailable\n");
}
}
// Shutdown the ExecutorService
scheduler.shutdownNow();
}
}
Future
Future object can be useful even when
the task does not return anything since it provides other
functionalityFuture object has a cancel()
methodFuture object can be parameterized
using ? to indicate that we don't
care about the return type
Terminator Taskimport java.util.concurrent.*;
/**
* A Runnable that can be used to cancel a Future and shutdown the
* ExecutorService.
*
* @author Prof. David Bernstein, James Madison University
* @version 1.0
*/
public class Terminator implements Runnable
{
ExecutorService service;
Future<?> future;
/**
* Explicit Value Constructor.
*
* @param future The Future to be cancelled.
* @param service The ExecutorService
*/
public Terminator(Future<?> future, ExecutorService service)
{
this.future = future;
this.service = service;
}
/**
* The code to execute.
*/
public void run()
{
// Cancel the Future
future.cancel(true);
// Shutdown the ExecutorService
try
{
service.shutdown();
service.awaitTermination(2000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException ie)
{
}
finally
{
service.shutdownNow();
}
}
}
Terminator Taskimport java.io.*;
import java.util.concurrent.*;
/**
* A utility application that repeatedly saves a snapshot of the files in the
* current directory.
*
* This version runs for a specific amount if time; it does not have
* a UI that can be used to shut it down.
* @author Prof. David Bernstein, James Madison University
* @version 4
*/
public class AutoDir
{
/**
* The entry point of the application.
*
* args[0] contains the delay between snapshots.
*
* @param args The command-line arguments.
*/
public static void main(String[] args) throws IOException
{
long delay, duration;
// Process the command-line arguments
delay = 60000; // Once per minute
duration = 3600000; // For an hour
if ((args != null) && (args.length > 1))
{
delay = Long.parseLong(args[0]);
duration = Long.parseLong(args[1]);
}
// Construct the scheduler
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// Construct and schedule the DirectoryLister
Future<?> task;
task = scheduler.scheduleAtFixedRate(new DirectoryLister(), 0, delay,
TimeUnit.MILLISECONDS);
// Construct and schedule the Terminator
scheduler.schedule(new Terminator(task, scheduler),
duration, TimeUnit.MILLISECONDS);
}
}
Executor and ExecutorService:
Runnable to an Executor
happen-before its execution beginsCallable to an ExecutorService
happen-before its execution beginsFuture:
Future happen-before actions subsequent
to the retrieval of the result via Future.get()
in another thread