UDP for Text Transport
An Introduction with Examples in Java |
Prof. David Bernstein |
Computer Science Department |
bernstdh@jmu.edu |
byte[] data; DatagramPacket dp; try { // Convert the String to a byte array data = message.getBytes(); // Construct a packet (that includes the address and port) dp = new DatagramPacket(data,data.length, ipAddress,port); // Send the packet ds.send(dp); } catch (IOException ioe) { // To debug: ioe.printStackTrace(); }
byte[] buffer; DatagramPacket in; int bufferLength; String line; bufferLength = 255; try { // Construct an empty byte array buffer = new byte[bufferLength]; // Construct an empty DatagramPacket in = new DatagramPacket(buffer, buffer.length); // Block until an incoming packet is received, // then fill the byte array ds.receive(in); // Construct a String from the byte array line=new String(in.getData(), 0, in.getLength()); } catch (IOException ioe) { // receive() had a problem. Re-try. }
receive()
method blocksreceive()
method time-out periodically
if necssary (by calling the setSoTimeout()
method before receive()
)
UDPMessageSender
package internet; import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; /** * Sends text messages to a UDP port in a separate thread of execution * * @version 1.0 * @author Prof. David Bernstein, James Madison University */ public class UDPMessageSender implements Runnable { private volatile boolean keepRunning; private DatagramSocket ds; private InetAddress ipAddress; private int port; private List<String> messageQueue; private Object signal; private Thread controlThread; /** * Explicit Value Constructor * * @param address The IP addres to send to */ public UDPMessageSender(InetAddress address) throws SocketException { this(address, 9001); } /** * Explicit Value Constructor * * @param address The IP addres to send to * @param port The UDP port to send to */ public UDPMessageSender(InetAddress address, int port) throws SocketException { // Remember the destination address and port ipAddress = address; this.port = port; // Construct a DatagramSocket using any available port ds = new DatagramSocket(); // Construct the message queue // (If needed, one could use a java.util.concurrent.BlockingQueue) messageQueue = Collections.synchronizedList(new LinkedList<String>()); // Construct the Object that is used to signal the // state of the messageQueue signal = new Object(); } /** * The code that is executed in the controlThread object's * thread of execution */ public void run() { String message; while (keepRunning) { while (messageQueue.size() > 0) { // Remove the message at the front of the queue (Note: // Since messageQueue is "wrapped", unsynchronized access // is prevented) message = messageQueue.remove(0); transmit(message); } // Enter the wait state until the messageQueue isn't empty synchronized(signal) { try { // Check to make sure nothings been added to the queue // in another thread if (messageQueue.size() == 0) { signal.wait(); } } catch (InterruptedException ie) { // This probably means stop() was called. If not, // just try again. } } } controlThread = null; } /** * Send a message. * * Actually, this method puts the message in a queue. The * message will actually be sent in another thread of execution. * * @param message The message to send */ public void send(String message) { // Notify other threads that the messageQueue has changed synchronized(signal) { // Add to the end of the queue (Note: Since messageQueue is // "wrapped", unsynchronized access is prevented). messageQueue.add(message); // It is possible that the message was already handled in // another thread making this notification extraneous, but // it doesn't hurt. (We could, instead, check // messageQueue.size() first.) signal.notifyAll(); } } /** * Set the IP address to send to * * @param address The new IP address number */ public void setAddress(InetAddress address) { this.ipAddress = address; } /** * Set the port number to send to * * @param port The new port number */ public void setPort(int port) { this.port = port; } /** * Start the thread of execution that actually sends * the messages */ public void start() { if (controlThread == null) { controlThread = new Thread(this); keepRunning = true; controlThread.start(); } } /** * Stop the thread of execution (after it finishes sending * messages) */ public void stop() { keepRunning = false; // Force the thread out of the wait state (if necessary) controlThread.interrupt(); } /** * Actually transmit a message * * @param message The message */ private void transmit(String message) { byte[] data; DatagramPacket dp; try { // Convert the String to a byte array data = message.getBytes(); // Construct a packet (that includes the address and port) dp = new DatagramPacket(data,data.length, ipAddress,port); // Send the packet ds.send(dp); } catch (IOException ioe) { // To debug: ioe.printStackTrace(); } } }
UDPMessageReceiver
package internet; /** * Listens for text messages * * @version 1.0 * @author Prof. David Bernstein, James Madison University */ public interface MessageListener { /** * Handle a message * * @param text The text of the message */ public void handleMessage(String text); }
package internet; import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; /** * Receives text messages (with a maximum length of 255 bytes) on a * UDP port and informs observers of those messages in a separate * thread of execution * * @version 1.0 * @author Prof. David Bernstein, James Madison University */ public class UDPMessageReceiver implements Runnable { private volatile boolean keepRunning; private DatagramSocket ds; private CopyOnWriteArrayList<MessageListener> listeners; private int port; private Thread controlThread; /** * Explicit Value Constructor * * @param port The UDP port to listen to */ public UDPMessageReceiver(int port) throws IOException { this.port = port; keepRunning = true; ds = new DatagramSocket(port); // throws IOException // Make sure that receive() times-out so the thread can be stopped ds.setSoTimeout(5000); // A CopyOnWriteArrayList is used because: we need to preclude // interference across threads, iterations vastly outnumber // mutations, and we don't want to synchronize iterations listeners = new CopyOnWriteArrayList<MessageListener>(); } /** * Add a MessageListener * * @param listener The MessageListener to add */ public void addMessageListener(MessageListener listener) { // This may be slow (because listeners is a CopyOnWriteArrayList) // but shouldn't happen frequently listeners.add(listener); } /** * Notify all MessageListener objects * * @param message The text message */ public void notifyListeners(String message) { Iterator<MessageListener> i; MessageListener listener; i = listeners.iterator(); while (i.hasNext()) { listener = i.next(); listener.handleMessage(message); } } /** * Remove a MessageListener * * @param listener The MessageListener to remove */ public void removeMessageListener(MessageListener listener) { // This may be slow (because listeners is a CopyOnWriteArrayList) // but shouldn't happen frequently listeners.remove(listener); } /** * Run this Receiver */ public void run() { byte[] buffer; DatagramPacket in; int bufferLength; String line; bufferLength = 255; while (keepRunning) { try { // Construct an empty byte array buffer = new byte[bufferLength]; // Construct an empty DatagramPacket in = new DatagramPacket(buffer, buffer.length); // Block until an incoming packet is received, // then fill the byte array ds.receive(in); // Construct a String from the byte array line=new String(in.getData(), 0, in.getLength()); // Notify all listeners that a packet has arrived notifyListeners(line); } catch (SocketTimeoutException ste) { // receive() timed-out. Check keepRunning and proceed. } catch (IOException ioe) { // receive() had a problem. Re-try. } } controlThread = null; } /** * Start the thread of execution that actually receives * the messages */ public void start() { if (controlThread == null) { controlThread = new Thread(this); keepRunning = true; controlThread.start(); } } /** * Stop the thread of execution (after it finishes receiving * messages) */ public void stop() { keepRunning = false; } }
import java.io.*; import java.net.*; import java.util.*; import internet.*; /** * An application that reads position reports from a file * and transmits them using UDP * * @version 1.0 * @author Prof. David Bernstein, James Madison University */ public class UDPPositionServer { public static void main(String[] args) { BufferedReader in; byte[] data; InetAddress ipAddress; int i, port; String[] track; UDPMessageSender sender; track = new String[396]; try { in = new BufferedReader( new FileReader("track.txt")); for (i=0; i<396; i++) { track[i] = in.readLine(); } ipAddress = InetAddress.getByName(args[0]); port = Integer.valueOf(args[1]).intValue(); sender = new UDPMessageSender(ipAddress, port); sender.start(); // It should also be stopped! System.out.println("Reporting to "+args[0]+ " on port "+port+"\n"); i = 0; while (true) { try { Thread.sleep(1000); sender.send(track[i]); i++; if (i >= track.length-1) i=0; } catch (InterruptedException ie) { // Do nothing } } } catch (UnknownHostException uhe) { System.err.println("Unknown host: "+uhe); System.exit(1); } catch (IOException ioes) { System.err.println("Unable to open socket: "+ ioes); System.exit(1); } } }
import java.awt.Point; import java.io.*; import java.util.*; /** * An abstract "generator" of position reports * * @version 1.0 * @author Prof. David Bernstein, James Madison University * */ public abstract class AbstractPositionSubject implements PositionSubject { protected HashSet<PositionListener> listeners; /** * Default Constructor */ public AbstractPositionSubject() { listeners = new HashSet<PositionListener>(); } /** * Add a position listener */ public void addPositionListener(PositionListener listener) { listeners.add(listener); } /** * Notify all position listeners of a position report */ public void notifyListeners(Point position) { Iterator<PositionListener> i; PositionListener listener; i = listeners.iterator(); while (i.hasNext()) { listener = i.next(); listener.handlePositionReport(position); } } /** * Remove a position listener */ public void removePositionListener(PositionListener listener) { listeners.remove(listener); } }
import java.awt.*; import java.io.*; import java.util.*; import internet.*; /** * A class for receiving position reports using UDP * * @version 1.0 * @author Prof. David Bernstein, James Madison University */ public class UDPPositionReceiver extends AbstractPositionSubject implements MessageListener { private UDPMessageReceiver receiver; /** * Explicit Value Constructor * * @param port The UDP port to listen to */ public UDPPositionReceiver(int port) throws IOException { super(); receiver = new UDPMessageReceiver(port); receiver.start(); // It should also be stopped! receiver.addMessageListener(this); } /** * Handle text messages containing position reports * * @param line The position messag */ public void handleMessage(String line) { int x, y; String token; StringTokenizer st; try { st = new StringTokenizer(line,","); token = st.nextToken(); x = Integer.valueOf(token.trim()).intValue(); token = st.nextToken(); y = Integer.valueOf(token.trim()).intValue(); notifyListeners(new Point(x,y)); } catch (NoSuchElementException nsee) { // Bad position } catch (NumberFormatException nfe) { // Bad position } } }
import java.awt.*; /** * A canvas for drawing road maps and plotting * vehicle tracks on them * * @version 1.0 * @author Prof. David Bernstein, James Madison University * */ public class TrackingCanvas extends RoadMapCanvas implements PositionListener { private Color vehicleColor; /** * Construct a new TrackingCanvas */ public TrackingCanvas() { super(); vehicleColor = new Color(255,0,255); } /** * Handle a position report * * @position The position report */ public void handlePositionReport(Point position) { plotPosition(position); } /** * Plots the position of a vehicle on the map * * @param p The position */ protected void plotPosition(Point p) { Graphics g; g = getGraphics(); g.setColor(vehicleColor); g.fillRect(p.x-2, p.y-2, 5, 5); } }
DatagramPacket
that has an
appropriate IP address (i.e., starts
with 1110
so is in the range 224.0.0.0 -
239.255.255.255, but not below 224.0.0.255, all of which
are reserved for use by routing protocols)