JMU
UDP for Text Transport
An Introduction with Examples in Java


Prof. David Bernstein
James Madison University

Computer Science Department
bernstdh@jmu.edu


User Datagram Protocol (UDP)
An Overview of UDP in Java
Sending Text Datagrams

Use any Available Port on the Sender

javaexamples/internet/UDPMessageSender.java (Fragment: 1)
               // Construct a DatagramSocket using any available port
       ds = new DatagramSocket();
        
Sending Text Datagrams (cont.)

The Process

javaexamples/internet/UDPMessageSender.java (Fragment: 2)
               byte[]                data;
       DatagramPacket        dp;


       try
       {
          // Convert the String to a byte array
          data = message.getBytes();
           
          // Construct a packet (that includes the address and port)
          dp = new DatagramPacket(data,data.length,
                                  ipAddress,port);
           
          // Send the packet
          ds.send(dp);
       } 
       catch (IOException ioe) 
       {
          // To debug: ioe.printStackTrace();
       }
        
Receiving Text Datagrams

Must Listen to a Particular Port

javaexamples/internet/UDPMessageReceiver.java (Fragment: 1)
               ds = new DatagramSocket(port); // throws IOException
        
Receiving Text Datagrams (cont.)

The Process

javaexamples/internet/UDPMessageReceiver.java (Fragment: 2)
               byte[]           buffer;
       DatagramPacket   in;
       int              bufferLength;
       String           line;
       
       
       bufferLength = 255;
          try
          {
             // Construct an empty byte array
             buffer = new byte[bufferLength];

             // Construct an empty DatagramPacket
             in = new DatagramPacket(buffer, buffer.length);

             // Block until an incoming packet is received,
             // then fill the byte array
             ds.receive(in);
             
             // Construct a String from the byte array
             line=new String(in.getData(), 0, in.getLength());

          }
          catch (IOException ioe)
          {
             // receive() had a problem.  Re-try.
          }
        
Be Careful!
Putting it All Together

A UDPMessageSender

javaexamples/internet/UDPMessageSender.java
        package internet;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;


/**
 * Sends text messages to a UDP port in a separate thread of execution 
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 */
public class UDPMessageSender implements Runnable
{
    private volatile boolean              keepRunning;
    private DatagramSocket                ds;
    private InetAddress                   ipAddress;
    private int                           port;
    private List<String>                  messageQueue;    
    private Object                        signal;    
    private Thread                        controlThread;

    

    /**
     * Explicit Value Constructor
     *
     * @param address   The IP addres to send to
     */
    public UDPMessageSender(InetAddress address) 
       throws SocketException
    {
       this(address, 9001);
    }

    /**
     * Explicit Value Constructor
     *
     * @param address   The IP addres to send to
     * @param port      The UDP port to send to
     */
    public UDPMessageSender(InetAddress address, 
                            int port) 
       throws SocketException
    {
       // Remember the destination address and port
       ipAddress = address;
       this.port = port;
       
       // Construct a DatagramSocket using any available port
       ds = new DatagramSocket();

       // Construct the message queue
       // (If needed, one could use a java.util.concurrent.BlockingQueue)
       messageQueue = Collections.synchronizedList(new LinkedList<String>());

       // Construct the Object that is used to signal the 
       // state of the messageQueue
       signal = new Object();
    }


    /**
     * The code that is executed in the controlThread object's
     * thread of execution
     */
    public void run()
    {
       String            message;
       

       while (keepRunning)
       {
          while (messageQueue.size() > 0)
          {
             // Remove the message at the front of the queue (Note:
             // Since messageQueue is "wrapped", unsynchronized access
             // is prevented)
             message = messageQueue.remove(0);
             transmit(message);
          }
             
          // Enter the wait state until the messageQueue isn't empty
          synchronized(signal)
          {
             try
             {
                // Check to make sure nothings been added to the queue
                // in another thread
                if (messageQueue.size() == 0) 
                {
                   signal.wait();
                }
             }
             catch (InterruptedException ie)
             {
                // This probably means stop() was called.  If not,
                // just try again.
             }
          }
       }
       
       controlThread = null;
    }


    /**
     * Send a message.
     *
     * Actually, this method puts the message in a queue.  The
     * message will actually be sent in another thread of execution.
     *
     * @param message  The message to send
     */
    public void send(String message)
    {
       // Notify other threads that the messageQueue has changed
       synchronized(signal)
       {
          // Add to the end of the queue (Note: Since messageQueue is
          // "wrapped", unsynchronized access is prevented).
          messageQueue.add(message);

          // It is possible that the message was already handled in
          // another thread making this notification extraneous, but
          // it doesn't hurt.  (We could, instead, check
          // messageQueue.size() first.)
          signal.notifyAll();
       }
    }
    

    /**
     * Set the IP address to send to
     *
     * @param address   The new IP address number
     */
    public void setAddress(InetAddress address)
    {
       this.ipAddress = address;
    }




    /**
     * Set the port number to send to
     *
     * @param port   The new port number
     */
    public void setPort(int port)
    {
       this.port = port;
    }


    /**
     * Start the thread of execution that actually sends
     * the messages
     */
    public void start()
    {
       if (controlThread == null)
       {
          controlThread = new Thread(this);
          keepRunning = true;

          controlThread.start();
       }
    }
    


    /**
     * Stop the thread of execution (after it finishes sending
     * messages)
     */
    public void stop()
    {
       keepRunning = false;

       // Force the thread out of the wait state (if necessary)
       controlThread.interrupt();
    }


    /**
     * Actually transmit a message
     *
     * @param message   The message
     */
    private void transmit(String message)
    {
       byte[]                data;
       DatagramPacket        dp;


       try
       {
          // Convert the String to a byte array
          data = message.getBytes();
           
          // Construct a packet (that includes the address and port)
          dp = new DatagramPacket(data,data.length,
                                  ipAddress,port);
           
          // Send the packet
          ds.send(dp);
       } 
       catch (IOException ioe) 
       {
          // To debug: ioe.printStackTrace();
       }
    }
}
        
Putting it All Together (cont.)

A UDPMessageReceiver

javaexamples/internet/MessageListener.java
        package internet;

/**
 * Listens for text messages
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 */
public interface MessageListener
{

    /**
     * Handle a message
     *
     * @param text   The text of the message
     */
    public void handleMessage(String text);
}
        
javaexamples/internet/UDPMessageReceiver.java
        package internet;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;


/**
 * Receives text messages (with a maximum length of 255 bytes) on a
 * UDP port and informs observers of those messages in a separate
 * thread of execution
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 */
public class UDPMessageReceiver implements Runnable
{
    private volatile boolean                      keepRunning;
    private DatagramSocket                        ds;
    private CopyOnWriteArrayList<MessageListener> listeners;
    private int                                   port;
    private Thread                                controlThread;


    /**
     * Explicit Value Constructor
     *
     * @param port       The UDP port to listen to
     */
    public UDPMessageReceiver(int port) throws IOException
    {
       this.port = port;
       keepRunning = true;
       
       ds = new DatagramSocket(port); // throws IOException
       // Make sure that receive() times-out so the thread can be stopped
       ds.setSoTimeout(5000);
       

       // A CopyOnWriteArrayList is used because: we need to preclude
       // interference across threads, iterations vastly outnumber
       // mutations, and we don't want to synchronize iterations
       listeners = new CopyOnWriteArrayList<MessageListener>();
    }

                            

    /**
     * Add a MessageListener
     *
     * @param listener   The MessageListener to add
     */
    public void addMessageListener(MessageListener listener)
    {
       // This may be slow (because listeners is a CopyOnWriteArrayList)
       // but shouldn't happen frequently
       listeners.add(listener);
    }
    


    /**
     * Notify all MessageListener objects 
     *
     * @param message   The text message
     */
    public void notifyListeners(String message)
    {
       Iterator<MessageListener>       i;       
       MessageListener                 listener;

       i = listeners.iterator();
       while (i.hasNext())
       {
          listener = i.next();
          listener.handleMessage(message);
       }
    }



    /**
     * Remove a MessageListener
     *
     * @param listener   The MessageListener to remove
     */
    public void removeMessageListener(MessageListener listener)
    {
       // This may be slow (because listeners is a CopyOnWriteArrayList)
       // but shouldn't happen frequently
       listeners.remove(listener);
    }



    /**
     * Run this Receiver
     */
    public void run()
    {
       byte[]           buffer;
       DatagramPacket   in;
       int              bufferLength;
       String           line;
       
       
       bufferLength = 255;
       while (keepRunning)
       {
          try
          {
             // Construct an empty byte array
             buffer = new byte[bufferLength];

             // Construct an empty DatagramPacket
             in = new DatagramPacket(buffer, buffer.length);

             // Block until an incoming packet is received,
             // then fill the byte array
             ds.receive(in);
             
             // Construct a String from the byte array
             line=new String(in.getData(), 0, in.getLength());

             // Notify all listeners that a packet has arrived
             notifyListeners(line);
          }
          catch (SocketTimeoutException ste)
          {
             // receive() timed-out.  Check keepRunning and proceed.
          }
          catch (IOException ioe)
          {
             // receive() had a problem.  Re-try.
          }
       }
       controlThread = null;       
    }


    /**
     * Start the thread of execution that actually receives
     * the messages
     */
    public void start()
    {
       if (controlThread == null)
       {
          controlThread = new Thread(this);
          keepRunning = true;

          controlThread.start();
       }
    }


    /**
     * Stop the thread of execution (after it finishes receiving
     * messages)
     */
    public void stop()
    {
       keepRunning = false;
    }

}
        
An Example
javaexamples/map1/UDPPositionServer.java
        import java.io.*;
import java.net.*;
import java.util.*;

import internet.*;

/**
 * An application that reads position reports from a file
 * and transmits them using UDP
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 */
public class UDPPositionServer
{

    public static void main(String[] args)
    {
       BufferedReader        in;
       byte[]                data;
       InetAddress           ipAddress;
       int                   i, port;
       String[]              track;
       UDPMessageSender      sender;


       track = new String[396];
       try 
       {
          in = new BufferedReader(
             new FileReader("track.txt"));

          for (i=0; i<396; i++) 
          {
             track[i] = in.readLine();
          }

          ipAddress = InetAddress.getByName(args[0]);
          port      = Integer.valueOf(args[1]).intValue();
          sender = new UDPMessageSender(ipAddress, port);
          sender.start(); // It should also be stopped!

          System.out.println("Reporting to "+args[0]+
                             " on port "+port+"\n");

          i = 0;
          while (true)
          {
                
             try
             {
                Thread.sleep(1000);

                sender.send(track[i]);

                i++;
                if (i >= track.length-1) i=0;
             } 
             catch (InterruptedException ie) 
             {
                // Do nothing
             }
          }
       }
       catch (UnknownHostException uhe)
       {
          System.err.println("Unknown host: "+uhe);
          System.exit(1);
       }
       catch (IOException ioes)
       {
          System.err.println("Unable to open socket: "+
                             ioes);
          System.exit(1);

       }
    }

}
        
javaexamples/map1/AbstractPositionSubject.java
        import java.awt.Point;
import java.io.*;
import java.util.*;


/**
 * An abstract "generator" of position reports
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 *
 */
public abstract class AbstractPositionSubject implements PositionSubject
{
    protected HashSet<PositionListener>  listeners;


    /**
     * Default Constructor
     */
    public AbstractPositionSubject()
    {
       listeners = new HashSet<PositionListener>();
    }




    /**
     * Add a position listener
     */
    public void addPositionListener(PositionListener listener)
    {
       listeners.add(listener);
    }




    /**
     * Notify all position listeners of a position report
     */
    public void notifyListeners(Point position)
    {
       Iterator<PositionListener>      i;
       PositionListener                listener;

       i = listeners.iterator();
       while (i.hasNext()) 
       {
          listener = i.next();
          listener.handlePositionReport(position);
       }
    }




    /**
     * Remove a position listener
     */
    public void removePositionListener(PositionListener listener)
    {
       listeners.remove(listener);
    }


}
        
javaexamples/map1/UDPPositionReceiver.java
        import java.awt.*;
import java.io.*;
import java.util.*;

import internet.*;

/**
 * A class for receiving position reports using UDP
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 */
public class UDPPositionReceiver extends    AbstractPositionSubject
                                 implements MessageListener
{
    private UDPMessageReceiver   receiver;


    /**
     * Explicit Value Constructor
     *
     * @param port  The UDP port to listen to
     */
    public UDPPositionReceiver(int port) throws IOException
    {
       super();

       receiver = new UDPMessageReceiver(port);
       receiver.start(); // It should also be stopped!
       receiver.addMessageListener(this);
    }
                            

    /**
     * Handle text messages containing position reports
     *
     * @param line   The position messag
     */
    public void handleMessage(String line)
    {
       int              x, y;
       String           token;
       StringTokenizer  st;




       try 
       {
          st = new StringTokenizer(line,",");

          token = st.nextToken();
          x = Integer.valueOf(token.trim()).intValue();
            
          token = st.nextToken();
          y = Integer.valueOf(token.trim()).intValue();
            
          notifyListeners(new Point(x,y));
       } 
       catch (NoSuchElementException nsee) 
       {
          // Bad position
       }
       catch (NumberFormatException nfe) 
       {
          // Bad position
       }
    }
}
        
javaexamples/map1/TrackingCanvas.java
        import java.awt.*;

/**
 * A canvas for drawing road maps and plotting 
 * vehicle tracks on them
 *
 * @version 1.0
 * @author  Prof. David Bernstein, James Madison University
 *
 */
public class TrackingCanvas extends    RoadMapCanvas
   implements PositionListener
{
    private Color vehicleColor;

    /**
     * Construct a new TrackingCanvas
     */
    public TrackingCanvas()
    {
       super();
       vehicleColor = new Color(255,0,255);
    }




    /**
     * Handle a position report
     *
     * @position  The position report
     */
    public void handlePositionReport(Point position)
    {
       plotPosition(position);
    }



    /**
     * Plots the position of a vehicle on the map
     *
     * @param p  The position
     */
    protected void plotPosition(Point p)
    {
       Graphics   g;

       g = getGraphics();

       g.setColor(vehicleColor);

       g.fillRect(p.x-2, p.y-2, 5, 5);
    }
    
}

        
UDP Multicast