Threads
An Introduction with Examples in Java |
Prof. David Bernstein |
Computer Science Department |
bernstdh@jmu.edu |
Dispatcher
import java.util.*; /** * Used to dispatch vehicles in a fleet (e.g., emergency * response vehicles). * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class Dispatcher { protected int numberOfVehicles; protected LinkedList<Integer> availableVehicles; /** * Construct a new Dispatcher * * @param n The number of vehicles in the fleet */ public Dispatcher(int n) { int i; numberOfVehicles = n; availableVehicles = new LinkedList<Integer>(); for (i=0; i < n; i++) { makeVehicleAvailable(i); } } /** * Dispatch an available vehicle * * @param task The task to be handled * @return true if a vehicle was dispatched */ public boolean dispatch(String task) { boolean ok; int vehicle; Integer v; ok = false; if (availableVehicles.size() > 0) { v = availableVehicles.removeFirst(); vehicle = v.intValue(); sendMessage(vehicle, task); ok = true; } else { ok = false; } return ok; } /** * Makes a vehicle available for future dispatching * * @param vehicle The number of the vehicle */ public void makeVehicleAvailable(int vehicle) { availableVehicles.addLast(new Integer(vehicle)); } /** * Sends a message to a vehicle * * @param vehicle The number of the vehicle * @param message The message to send */ private void sendMessage(int vehicle, String message) { // This method would normally transmit the message // to the vehicle. For simplicity, it now writes it // to the screen instead. System.out.println(vehicle+"\t"+message+"\n"); System.out.flush(); } }
AbstractDispatchHandler
import java.io.*; import java.util.*; /** * An abstract dispatch handler. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public abstract class AbstractDispatchHandler { protected BufferedReader in; protected Dispatcher dispatcher; /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use * @param f The name of the dispatch file */ public AbstractDispatchHandler(Dispatcher d, BufferedReader in) { dispatcher = d; this.in = in; } /** * Process an individual dispatch. * * @param line The dispatch */ protected abstract void processDispatch(String line); /** * Process the daily dispatches * * Note: This method will continue to execute until * all vehicles have been dispatched. This could take * all day. The application will not be able to * do anything else while this method is executing. */ public void processDispatches() { String line; do { try { line = in.readLine(); if (line != null) processDispatch(line); } catch (IOException ioe) { line = ""; System.err.println("Unable to read a dispatch"); } } while (line != null); } /** * Start this DailyDispatchHandler * * Specifically, process the daily dispatches */ public void start() { processDispatches(); try { in.close(); } catch (IOException ioeClose) { System.err.println("Unable to close dispatches"); } } }
DailyDispatchHandler
import java.io.*; import java.util.*; /** * A class that reads in the daily list of dispatching * tasks from a file and hands them off to a Dispatcher * * Note: While this class' processDispatches() * method is executing, the application * will not be able to do anything else. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class DailyDispatchHandler extends AbstractDispatchHandler { private long currentTime, lastTime; /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use * @param f The name of the dispatch file */ public DailyDispatchHandler(Dispatcher d, String f) throws IOException { super(d, new BufferedReader(new FileReader(f))); lastTime = System.currentTimeMillis(); } /** * Process an individual dispatch (required by * AbstractDispatchHandler). * * @param line The dispatch */ public void processDispatch(String line) { int wait; String message; StringTokenizer st; st = new StringTokenizer(line,"\t"); wait = Integer.parseInt(st.nextToken()); message = st.nextToken(); // Wait until the appropriate time before dispatching while (System.currentTimeMillis()-lastTime < wait) { // Do nothing } dispatcher.dispatch(message); lastTime = System.currentTimeMillis(); } }
RealTimeDispatchHandler
import java.io.*; import java.util.*; /** * A class that reads in dispatching tasks from the console and hands * them off to a Dispatcher * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class RealTimeDispatchHandler extends AbstractDispatchHandler { /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use */ public RealTimeDispatchHandler(Dispatcher d) throws IOException { super(d, new BufferedReader(new InputStreamReader(System.in))); } /** * Process an individual dispatch (required by * AbstractDispatchHandler). * * @param line The dispatch */ public void processDispatch(String line) { dispatcher.dispatch(line); } }
import java.io.*; import java.util.Date; /** * The driver for an example that motivates the need for * multi-threaded applications * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class Driver { /** * The entry point of the application * * @param args The command line arguments */ public static void main(String[] args) throws IOException { DailyDispatchHandler daily; Dispatcher dispatcher; RealTimeDispatchHandler rt; String userInput; dispatcher = new Dispatcher(50); try { daily = new DailyDispatchHandler(dispatcher, "dispatches.txt"); daily.start(); } catch (IOException ioe) { System.err.println("Unable to open daily dispatches."); } try { rt = new RealTimeDispatchHandler(dispatcher); rt.start(); } catch (IOException ioe) { System.err.println("Unable to read from the console."); } } }
RealTimeDispatchHandler
does not start
working until the DailyDispatchHandler
has completed its job
while
loop
in the processDispatches()
method
maintains control of the CPU until all
of the daily dispatches have been completedDispatcher
"at the same time"import java.io.*; import java.util.*; /** * An abstract dispatch handler. * * Note: This version starts a new thread of execution * to process the daily dispatches. * * @author Prof. David Bernstein, James Madison University * @version 2.0 */ public abstract class AbstractDispatchHandler implements Runnable { protected BufferedReader in; protected Dispatcher dispatcher; protected Thread controlThread; /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use * @param f The name of the dispatch file */ public AbstractDispatchHandler(Dispatcher d, BufferedReader in) { dispatcher = d; this.in = in; } /** * Process an individual dispatch. * * @param line The dispatch */ protected abstract void processDispatch(String line); /** * Process the daily dispatches. * * Note: This method will continue to execute until * all vehicles have been dispatched. This could take * all day. The application will not be able to * do anything else while this method is executing. */ public void processDispatches() { String line; do { try { line = in.readLine(); if (line != null) processDispatch(line); } catch (IOException ioe) { line = ""; System.err.println("Unable to read a dispatch"); } } while (line != null); } /** * Start this AbstractDispatchHandler. * * Specifically, start a new thread of execution and process * the dispatches in this thread. */ public void start() { if (controlThread == null) { controlThread = new Thread(this); controlThread.start(); } } /** * The code that runs in this AbstractDispatchHandler * objects thread of execution (required by Runnable). */ public void run() { processDispatches(); try { in.close(); } catch (IOException ioeClose) { System.err.println("Unable to close dispatches"); } } }
Dispatcher
now has shared mutable state
DailyDispatchHandler
uses a tight loop
that wastes processor resourcesimport java.io.*; import java.util.*; /** * A class that reads in the daily list of dispatching * tasks from a file and hands them off to a Dispatcher. * * Note: This version sleeps between dispatches rather than * running in a tight loop. * * @author Prof. David Bernstein, James Madison University * @version 3.0 */ public class DailyDispatchHandler extends AbstractDispatchHandler { /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use * @param f The name of the dispatch file */ public DailyDispatchHandler(Dispatcher d, String f) throws IOException { super(d, new BufferedReader(new FileReader(f))); } /** * Process an individual dispatch (required by * AbstractDispatchHandler). * * @param line The dispatch */ public void processDispatch(String line) { int wait; String message; StringTokenizer st; st = new StringTokenizer(line,"\t"); wait = Integer.parseInt(st.nextToken()); message = st.nextToken(); try { // Sleep the appropriate amount of time // before dispatching the vehicle. Other // threads can execute while this one // is sleeping. // controlThread.sleep(wait); dispatcher.dispatch(message); } catch (InterruptedException ie) { // Don't dispatch } } }
interrupt()
Method:
true
isInterrupted()
method of the
controlling Thread
objectDailyDispatchHandler
and
RealTimeDispatcher
run until EOS
boolean
attribute
called keepRunning
and change the do-while
loop
from do{...}while (line != null);
to
do{...}while (keepRunning && (line != null));
stop()
method that assigns
false
to keepRunning
keepRunning
(i.e., the thread that stop()
executes in)
and another "loads" values from it (i.e., the dispatch handler's
thread)
volatile
Modifier:
volatible
attribute always returns the
most recent write by any threadvolatile
reference types only provide this guarantee
for the referenec itself (e.g., not the elemnts of an array or
the attributes of an object)import java.io.*; import java.util.*; /** * An abstract dispatch handler. * * Note: This version allows the thread of execution * to be stopped (after the blocking I/O operation completes.) * * @author Prof. David Bernstein, James Madison University * @version 4.0 */ public abstract class AbstractDispatchHandler implements Runnable { protected volatile boolean keepRunning; protected BufferedReader in; protected Dispatcher dispatcher; protected Thread controlThread; /** * Construct a new DailyDispatchHandler * * @param d The Dispatcher to use * @param f The name of the dispatch file */ public AbstractDispatchHandler(Dispatcher d, BufferedReader in) { dispatcher = d; this.in = in; } /** * Process an individual dispatch. * * @param line The dispatch */ protected abstract void processDispatch(String line); /** * Process the daily dispatches. * * Note: This method will continue to execute until * all vehicles have been dispatched. This could take * all day. The application will not be able to * do anything else while this method is executing. */ public void processDispatches() { String line; do { try { line = in.readLine(); if (line != null) processDispatch(line); } catch (IOException ioe) { line = ""; System.err.println("Unable to read a dispatch"); } } while (keepRunning && (line != null)); } /** * Start this AbstractDispatchHandler. * * Specifically, start a new thread of execution and process * the dispatches in this thread. */ public void start() { if (controlThread == null) { controlThread = new Thread(this); keepRunning = true; controlThread.start(); } } /** * Stop this AbstractDispatchHandler. */ public void stop() { keepRunning = false; // Interrupt the thread of execution in case // it is in a wait state. controlThread.interrupt(); } /** * The code that runs in this AbstractDispatchHandler * objects thread of execution (required by Runnable). */ public void run() { processDispatches(); try { in.close(); } catch (IOException ioeClose) { System.err.println("Unable to close dispatches"); } } }
Dispatcher
has shared mutable state/** * Dispatch an available vehicle * * @param task The task to be handled * @return true if a vehicle was dispatched */ public boolean dispatch(String task) { boolean ok; int vehicle; Integer v; ok = false; if (availableVehicles.size() > 0) { v = availableVehicles.removeFirst(); vehicle = v.intValue(); sendMessage(vehicle, task); ok = true; } else { ok = false; } return ok; }
(availableVehicles.size() > 0)
and then runs out of time
public int getNextIndex() { return ++index; }
/** * An encapsulation of a MutableInteger that can be used to demonstrate * various problems that arise when using shared mutable state. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class NonatomicInteger implements MutableInteger { private int value; /** * Explicit Value Constructor. * * @param initial The initial value */ public NonatomicInteger(int initial) { set(initial); } /** * Get the current value of this MutableInteger. * * @return The current value */ public int get() { return value; } /** * Increment (and return) the value of this MutableInteger. * * @return The current value (i.e., after it is incremented) */ public int incrementAndGet() { ++value; return value; } /** * Set the value of this MutableInteger. * * @param value The new value */ public void set(int value) { this.value = value; } }
/** * An implementation of Runnable that increases a MutableInteger. * It can be used to illustrate problems that can arise when using shared * mutable state. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class Counter implements Runnable { private int iterations; private MutableInteger mutable; /** * Explicit Value Constructor. * * @param mutable The MutableInteger to use * @param iterations The number of times to increment the MutableInteger */ public Counter(MutableInteger mutable, int iterations) { this.mutable = mutable; this.iterations = iterations; } /** * The code to run in the thread of execution. */ public void run() { for (int i=0; i<iterations; i++) { mutable.incrementAndGet(); } System.out.printf("Value: %d\n", mutable.get()); } }
/** * An application that illustrates some of the problems that can arise * when using shared mutable state, and how synchronization can * prevent them. * * If the application is run with no command line arguments it will * use a NonatomicInteger. If any command line arguments are provided * it will use a SynchronizedInteger. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class CounterDriver { private static final int ITERATIONS = 1000000; private static final int THREADS = 5; /** * The entry point of the application. * * @param args The command line arguments */ public static void main(String[] args) { MutableInteger mutable; if ((args != null) && (args.length > 0)) mutable = new SynchronizedInteger(0); else mutable = new NonatomicInteger(0); for (int i=0; i<THREADS; i++) { Thread thread = new Thread(new Counter(mutable, ITERATIONS)); thread.start(); } } }
synchronized
method
or block if it can acquire the relevant monitorsynchronized
Methods:
synchronized
synchronized
Blocks:
synchronized
Object
whose monitor should be used
/** * An encapsulation of a MutableInteger that can be used to * demonstrate how synchronization can avoid the problems that arise * when using shared mutable state. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class SynchronizedInteger implements MutableInteger { private int value; /** * Explicit Value Constructor. * * @param initial The initial value */ public SynchronizedInteger(int initial) { set(initial); } /** * Get the current value of this MutableInteger. * * @return The current value */ public synchronized int get() { return value; } /** * Increment (and return) the value of this MutableInteger. * * @return The current value (i.e., after it is incremented) */ public synchronized int incrementAndGet() { ++value; return value; } /** * Set the value of this MutableInteger. * * @param value The new value */ public synchronized void set(int value) { this.value = value; } }
Dispatcher
/** * Dispatch an available vehicle * * @param task The task to be handled * @return true if a vehicle was dispatched */ public synchronized boolean dispatch(String task) { boolean ok; int vehicle; Integer v; ok = false; if (availableVehicles.size() > 0) { v = availableVehicles.removeFirst(); vehicle = v.intValue(); sendMessage(vehicle, task); ok = true; } else { ok = false; } return ok; }
dispatch()
method and the
makeVehicleAvailable()
method change the queue
makeVehicleAvailable()
Dispatcher
/** * Makes a vehicle available for future dispatching * * @param vehicle The number of the vehicle */ public synchronized void makeVehicleAvailable(int vehicle) { availableVehicles.addLast(new Integer(vehicle)); }
dispatch()
method is modified so
that, instead of returning false
if
no vehicles are available it loops until one is availabledispatch()
makeVehicleAvailable()
dispatch()
method
enter the waiting state if no vehicles are
availablemakeVehicleAvailable()
can be executed in
another threadmakeVehicleAvailable()
must notify the waiting
threads
Dispatcher
import java.util.*; /** * Used to dispatch vehicles in a fleet (e.g., emergency * response vehicles). * * In this version, dispatch() does not return true/false, it waits until * a vehicle is available. * * @author Prof. David Bernstein, James Madison University * @version 6.0a */ public class Dispatcher { protected int numberOfVehicles; protected LinkedList<Integer> availableVehicles; private Object lock; /** * Construct a new Dispatcher * * @param n The number of vehicles in the fleet */ public Dispatcher(int n) { int i; lock = new Object(); numberOfVehicles = n; availableVehicles = new LinkedList<Integer>(); for (i=0; i < n; i++) { makeVehicleAvailable(i); } } /** * Dispatch an available vehicle. * * @param task The task to be handled * @return true (to be consistent with earlier versions) */ public boolean dispatch(String task) { boolean ok; synchronized(lock) { do { ok = attemptToDispatch(task); if (!ok) { try { lock.wait(); } catch (InterruptedException ie) { // Ignore } } while (ok == false); } return true; } /** * Attempt to dispatch a vehicle. * * @param task The task. * @return true if the dispatch was successful; false otherwise */ private boolean attemptToDispatch(String task) { boolean ok; int vehicle; Integer v; ok = false; if (availableVehicles.size() > 0) { v = availableVehicles.removeFirst(); vehicle = v.intValue(); sendMessage(vehicle, task); ok = true; } return ok; } /** * Makes a vehicle available for future dispatching * * @param vehicle The number of the vehicle */ public void makeVehicleAvailable(int vehicle) { synchronized(lock) { availableVehicles.addLast(new Integer(vehicle)); lock.notifyAll(); } } /** * Sends a message to a vehicle * * @param vehicle The number of the vehicle * @param message The message to send */ private void sendMessage(int vehicle, String message) { // This method would normally transmit the message // to the vehicle. For simplicity, it now writes it // to the screen instead. System.out.println(vehicle+"\t"+message+"\n"); System.out.flush(); } }
keepRunning
before it was declared to be
volatile
)start()
on a thread happens-before
any action in the started threadjoin()
on that
thread