Coordinating Threads
without Monitors/Intrinsic Locks |
Prof. David Bernstein |
Computer Science Department |
bernstdh@jmu.edu |
Lock
Interface:
lock()
- Acquire the lock when it is/becomes free
lockInterruptibly()
- Acquire the lock unless the
current Thread
is interruptedtryLock()
- Acquire the lock only if
it is free (perhaps within a given amount of time)unlock()
- Free the lockReentrantLock
Class:
Thread
that is waiting
to acquire a ReentrantLock
tryLock()
)
ReentrantLock
IdiomLock lock = new ReentrantLock(); // ... lock.lock(); try { // Update the mutable shared state } catch () // Handle possible exceptions { } finally { lock.unlock(); }
ReentrantLock
Objects:
ReentrantLock
Objects:
ReentrantReadWriteLock
Objects:
import java.util.*; import java.util.concurrent.locks.*; /** * A PresenceRecord is a "read mostly" record of presence. It can be * used to keep a record of objects and that can be accessed * simultaneously by many threads. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class PresenceRecord<E> { private final HashSet<E> set = new HashSet<E>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock r = lock.readLock(); private final Lock w = lock.writeLock(); /** * Add a given element to this PresenceRecord. * * @param e The element to add */ public boolean add(E e) { w.lock(); // This is a write operation try { return set.add(e); } finally { w.unlock(); } } /** * Clear this Presencerecord. */ public void clear() { w.lock(); // This is a write operation try { set.clear(); } finally { w.unlock(); } } /** * Returns true if this PresenceRecord contains the given element. * * @return true if the element is present; false otherwise */ public boolean contains(E e) { r.lock(); // This is a read operation try { return set.contains(e); } finally { r.unlock(); } } /** * Removes the given element from this PresenceRecord (if present). * * @return true if the element was present; false otherwise */ public boolean remove(E e) { w.lock(); // This is a write operation try { return set.remove(e); } finally { w.unlock(); } } }
int
named
count
await()
- causes the calling current thread to
enter the wait state until the count
reaches
0 (or it is interrupted, or a given amount of time has
elapsed)countDown()
- decreases the count
(releasing all waiting threads when the count reaches 0)
countDown()
can be called from any thread
(all that matters is the number of times it is called, not the
number of different threads that call it)import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** * An example that illustrates the use of a CountDownLatch. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class LatchExample { /** * The entry point of the application. * * @param args The command-line arguments (which are ignored) */ public static void main(String[] args) { Algorithm[] algorithm; AtomicInteger result = new AtomicInteger(-1); CountDownLatch latch = new CountDownLatch(1); // Construct multiple algorithms to solve the same problem algorithm = new Algorithm[2]; algorithm[0] = new GreedyAlgorithm(result, latch); algorithm[1] = new DynamicProgrammingAlgorithm(result, latch); // Start the algorithms for (int i=0; i<algorithm.length; i++) { (new Thread(algorithm[i])).start(); } try { // Wait for either of the two algorithms to finish latch.await(); // Get the result and continue processing int i = result.get(); System.out.printf("Using %d\n", i); } catch (InterruptedException ie) { // Respond accordingly } } }
join()
methods in the Thread
classRunnable
Used in the Example/** * An Averager object calculates the moving average of (either all or * part of) a data array. * * Because this class implements the Runnable interface the code that * performs the calculations can be executed in a separate thread. * * Note: If constructed properly (if they work with disjoint subsets * of the array), multiple Averager objects can perform the * calculations on the same data array. This is because, though multiple * threads may read the same elements of the data array, multiple threads * will not write the same element. (Of course, this assumes that the * data array is not changed in any other thread after it is constructed.) * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class Averager implements Runnable { private double[] data, result; private int first, length, window; /** * Explicit Value Constructor. * * @param window The number of elements in the average * @param data The data * @param first The first element in the data array to process * @param length The number of elements in the data array to process * @param result The array that will hold the moving average */ public Averager(int window, double[] data, int first, int length, double[] result) { this.window = window; this.data = data; this.first = first; this.length = length; this.result = result; } /** * Calculate the moving average. */ public void run() { // For each appropriate element in the data array int n = first+length; int start = Math.max(first, window-1); for (int i=start; i<n; i++) { // Calculate the average of the "previous" (including // the current index) window number of elements double total = 0; int m = i - window + 1; for (int j=i; j>=m; j--) { total += data[j]; } result[i] = total / (double)window; } } }
/** * An example that uses a multiple threads to calculate the moving average * of a data array. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class MultiThreadedMovingAverageExample extends AbstractMovingAverageExample { /** * The entry point of the application. * * @param args The command-line arguments (which are ignored) */ public static void main(String[] args) { double[] data = createData(); double[] ma = new double[data.length]; Thread[] worker = new Thread[THREADS]; // Create multiple threads to do the work and start them int work = ARRAY_LENGTH / THREADS; for (int i=0; i<THREADS; i++) { Averager averager = new Averager(WINDOW, data, i*work, work, ma); worker[i] = new Thread(averager); worker[i].start(); } // Wait for all of the threads to terminate for (int i=0; i<worker.length; i++) { try { worker[i].join(); } catch (InterruptedException ie) { } } // Calculate the range of the moving average in the main thread RangeFinder rf = new RangeFinder(ma, WINDOW, ARRAY_LENGTH-WINDOW); rf.run(); System.out.println("main thread terminating..."); } }
join()
waits for the thread to terminate; barriers
can be used at any pointint
named
parties
and an optional Runnable
named barrierAction
(that will be executed in
on of the threads)await()
- causes the calling current thread to
enter the wait state until the count
reaches
0 (or it is interrupted, or a given amount of time has
elapsed)getNumberWaiting()
- returns the number of
parties currently waiting at the barrier point
isBroken()
- returns true
if
a call to await()
times-out or is interrupted
reset()
- resets the barrier to its initial state
Runnable
Objectsimport java.util.concurrent.*; /** * A decorator of a Runnable that can be used to coordinate * multiple threads of execution. */ public class Barriered implements Runnable { private CyclicBarrier barrier; private Runnable decorated; public Barriered(Runnable decorated, CyclicBarrier barrier) { this.decorated = decorated; this.barrier = barrier; } public void run() { decorated.run(); try { barrier.await(); } catch (InterruptedException ie) { } catch (BrokenBarrierException be) { } } }
import java.util.concurrent.*; /** * An example that uses a CyclicBarrier in the calculation of * moving averages. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class BarrierMovingAverageExample extends AbstractMovingAverageExample { /** * The entry point of the application. * * @param args The command-line arguments (which are ignored) */ public static void main(String[] args) { double[] data = createData(); double[] ma = new double[data.length]; // Create a CyclicBarrier (that will calculate the range when reached) RangeFinder rf = new RangeFinder(ma, WINDOW, ARRAY_LENGTH-WINDOW); CyclicBarrier barrier = new CyclicBarrier(THREADS, rf); // Create multiple threads to do the work and start them int work = ARRAY_LENGTH / THREADS; for (int i=0; i<THREADS; i++) { Averager averager = new Averager(WINDOW, data, i*work, work, ma); Barriered b = new Barriered(averager, barrier); Thread worker = new Thread(b); worker.start(); } System.out.println("main thread terminating..."); } }
acquire()
methods are given permits on a
first-in-first-out basisLock
, Semaphore
, CountDownLatch
:
CyclicBarrier
, Phaser
: