|
UDP for Text Transport
An Introduction with Examples in Java |
|
Prof. David Bernstein |
| Computer Science Department |
| bernstdh@jmu.edu |
byte[] data;
DatagramPacket dp;
try
{
// Convert the String to a byte array
data = message.getBytes();
// Construct a packet (that includes the address and port)
dp = new DatagramPacket(data,data.length,
ipAddress,port);
// Send the packet
ds.send(dp);
}
catch (IOException ioe)
{
// To debug: ioe.printStackTrace();
}
byte[] buffer;
DatagramPacket in;
int bufferLength;
String line;
bufferLength = 255;
try
{
// Construct an empty byte array
buffer = new byte[bufferLength];
// Construct an empty DatagramPacket
in = new DatagramPacket(buffer, buffer.length);
// Block until an incoming packet is received,
// then fill the byte array
ds.receive(in);
// Construct a String from the byte array
line=new String(in.getData(), 0, in.getLength());
}
catch (IOException ioe)
{
// receive() had a problem. Re-try.
}
receive() method blocksreceive() method time-out periodically
if necssary (by calling the setSoTimeout()
method before receive())
UDPMessageSender
package internet;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
/**
* Sends text messages to a UDP port in a separate thread of execution
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*/
public class UDPMessageSender implements Runnable
{
private volatile boolean keepRunning;
private DatagramSocket ds;
private InetAddress ipAddress;
private int port;
private List<String> messageQueue;
private Object signal;
private Thread controlThread;
/**
* Explicit Value Constructor
*
* @param address The IP addres to send to
*/
public UDPMessageSender(InetAddress address)
throws SocketException
{
this(address, 9001);
}
/**
* Explicit Value Constructor
*
* @param address The IP addres to send to
* @param port The UDP port to send to
*/
public UDPMessageSender(InetAddress address,
int port)
throws SocketException
{
// Remember the destination address and port
ipAddress = address;
this.port = port;
// Construct a DatagramSocket using any available port
ds = new DatagramSocket();
// Construct the message queue
// (If needed, one could use a java.util.concurrent.BlockingQueue)
messageQueue = Collections.synchronizedList(new LinkedList<String>());
// Construct the Object that is used to signal the
// state of the messageQueue
signal = new Object();
}
/**
* The code that is executed in the controlThread object's
* thread of execution
*/
public void run()
{
String message;
while (keepRunning)
{
while (messageQueue.size() > 0)
{
// Remove the message at the front of the queue (Note:
// Since messageQueue is "wrapped", unsynchronized access
// is prevented)
message = messageQueue.remove(0);
transmit(message);
}
// Enter the wait state until the messageQueue isn't empty
synchronized(signal)
{
try
{
// Check to make sure nothings been added to the queue
// in another thread
if (messageQueue.size() == 0)
{
signal.wait();
}
}
catch (InterruptedException ie)
{
// This probably means stop() was called. If not,
// just try again.
}
}
}
controlThread = null;
}
/**
* Send a message.
*
* Actually, this method puts the message in a queue. The
* message will actually be sent in another thread of execution.
*
* @param message The message to send
*/
public void send(String message)
{
// Notify other threads that the messageQueue has changed
synchronized(signal)
{
// Add to the end of the queue (Note: Since messageQueue is
// "wrapped", unsynchronized access is prevented).
messageQueue.add(message);
// It is possible that the message was already handled in
// another thread making this notification extraneous, but
// it doesn't hurt. (We could, instead, check
// messageQueue.size() first.)
signal.notifyAll();
}
}
/**
* Set the IP address to send to
*
* @param address The new IP address number
*/
public void setAddress(InetAddress address)
{
this.ipAddress = address;
}
/**
* Set the port number to send to
*
* @param port The new port number
*/
public void setPort(int port)
{
this.port = port;
}
/**
* Start the thread of execution that actually sends
* the messages
*/
public void start()
{
if (controlThread == null)
{
controlThread = new Thread(this);
keepRunning = true;
controlThread.start();
}
}
/**
* Stop the thread of execution (after it finishes sending
* messages)
*/
public void stop()
{
keepRunning = false;
// Force the thread out of the wait state (if necessary)
controlThread.interrupt();
}
/**
* Actually transmit a message
*
* @param message The message
*/
private void transmit(String message)
{
byte[] data;
DatagramPacket dp;
try
{
// Convert the String to a byte array
data = message.getBytes();
// Construct a packet (that includes the address and port)
dp = new DatagramPacket(data,data.length,
ipAddress,port);
// Send the packet
ds.send(dp);
}
catch (IOException ioe)
{
// To debug: ioe.printStackTrace();
}
}
}
UDPMessageReceiver
package internet;
/**
* Listens for text messages
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*/
public interface MessageListener
{
/**
* Handle a message
*
* @param text The text of the message
*/
public void handleMessage(String text);
}
package internet;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
/**
* Receives text messages (with a maximum length of 255 bytes) on a
* UDP port and informs observers of those messages in a separate
* thread of execution
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*/
public class UDPMessageReceiver implements Runnable
{
private volatile boolean keepRunning;
private DatagramSocket ds;
private CopyOnWriteArrayList<MessageListener> listeners;
private int port;
private Thread controlThread;
/**
* Explicit Value Constructor
*
* @param port The UDP port to listen to
*/
public UDPMessageReceiver(int port) throws IOException
{
this.port = port;
keepRunning = true;
ds = new DatagramSocket(port); // throws IOException
// Make sure that receive() times-out so the thread can be stopped
ds.setSoTimeout(5000);
// A CopyOnWriteArrayList is used because: we need to preclude
// interference across threads, iterations vastly outnumber
// mutations, and we don't want to synchronize iterations
listeners = new CopyOnWriteArrayList<MessageListener>();
}
/**
* Add a MessageListener
*
* @param listener The MessageListener to add
*/
public void addMessageListener(MessageListener listener)
{
// This may be slow (because listeners is a CopyOnWriteArrayList)
// but shouldn't happen frequently
listeners.add(listener);
}
/**
* Notify all MessageListener objects
*
* @param message The text message
*/
public void notifyListeners(String message)
{
Iterator<MessageListener> i;
MessageListener listener;
i = listeners.iterator();
while (i.hasNext())
{
listener = i.next();
listener.handleMessage(message);
}
}
/**
* Remove a MessageListener
*
* @param listener The MessageListener to remove
*/
public void removeMessageListener(MessageListener listener)
{
// This may be slow (because listeners is a CopyOnWriteArrayList)
// but shouldn't happen frequently
listeners.remove(listener);
}
/**
* Run this Receiver
*/
public void run()
{
byte[] buffer;
DatagramPacket in;
int bufferLength;
String line;
bufferLength = 255;
while (keepRunning)
{
try
{
// Construct an empty byte array
buffer = new byte[bufferLength];
// Construct an empty DatagramPacket
in = new DatagramPacket(buffer, buffer.length);
// Block until an incoming packet is received,
// then fill the byte array
ds.receive(in);
// Construct a String from the byte array
line=new String(in.getData(), 0, in.getLength());
// Notify all listeners that a packet has arrived
notifyListeners(line);
}
catch (SocketTimeoutException ste)
{
// receive() timed-out. Check keepRunning and proceed.
}
catch (IOException ioe)
{
// receive() had a problem. Re-try.
}
}
controlThread = null;
}
/**
* Start the thread of execution that actually receives
* the messages
*/
public void start()
{
if (controlThread == null)
{
controlThread = new Thread(this);
keepRunning = true;
controlThread.start();
}
}
/**
* Stop the thread of execution (after it finishes receiving
* messages)
*/
public void stop()
{
keepRunning = false;
}
}
import java.io.*;
import java.net.*;
import java.util.*;
import internet.*;
/**
* An application that reads position reports from a file
* and transmits them using UDP
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*/
public class UDPPositionServer
{
public static void main(String[] args)
{
BufferedReader in;
byte[] data;
InetAddress ipAddress;
int i, port;
String[] track;
UDPMessageSender sender;
track = new String[396];
try
{
in = new BufferedReader(
new FileReader("track.txt"));
for (i=0; i<396; i++)
{
track[i] = in.readLine();
}
ipAddress = InetAddress.getByName(args[0]);
port = Integer.valueOf(args[1]).intValue();
sender = new UDPMessageSender(ipAddress, port);
sender.start(); // It should also be stopped!
System.out.println("Reporting to "+args[0]+
" on port "+port+"\n");
i = 0;
while (true)
{
try
{
Thread.sleep(1000);
sender.send(track[i]);
i++;
if (i >= track.length-1) i=0;
}
catch (InterruptedException ie)
{
// Do nothing
}
}
}
catch (UnknownHostException uhe)
{
System.err.println("Unknown host: "+uhe);
System.exit(1);
}
catch (IOException ioes)
{
System.err.println("Unable to open socket: "+
ioes);
System.exit(1);
}
}
}
import java.awt.Point;
import java.io.*;
import java.util.*;
/**
* An abstract "generator" of position reports
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*
*/
public abstract class AbstractPositionSubject implements PositionSubject
{
protected HashSet<PositionListener> listeners;
/**
* Default Constructor
*/
public AbstractPositionSubject()
{
listeners = new HashSet<PositionListener>();
}
/**
* Add a position listener
*/
public void addPositionListener(PositionListener listener)
{
listeners.add(listener);
}
/**
* Notify all position listeners of a position report
*/
public void notifyListeners(Point position)
{
Iterator<PositionListener> i;
PositionListener listener;
i = listeners.iterator();
while (i.hasNext())
{
listener = i.next();
listener.handlePositionReport(position);
}
}
/**
* Remove a position listener
*/
public void removePositionListener(PositionListener listener)
{
listeners.remove(listener);
}
}
import java.awt.*;
import java.io.*;
import java.util.*;
import internet.*;
/**
* A class for receiving position reports using UDP
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*/
public class UDPPositionReceiver extends AbstractPositionSubject
implements MessageListener
{
private UDPMessageReceiver receiver;
/**
* Explicit Value Constructor
*
* @param port The UDP port to listen to
*/
public UDPPositionReceiver(int port) throws IOException
{
super();
receiver = new UDPMessageReceiver(port);
receiver.start(); // It should also be stopped!
receiver.addMessageListener(this);
}
/**
* Handle text messages containing position reports
*
* @param line The position messag
*/
public void handleMessage(String line)
{
int x, y;
String token;
StringTokenizer st;
try
{
st = new StringTokenizer(line,",");
token = st.nextToken();
x = Integer.valueOf(token.trim()).intValue();
token = st.nextToken();
y = Integer.valueOf(token.trim()).intValue();
notifyListeners(new Point(x,y));
}
catch (NoSuchElementException nsee)
{
// Bad position
}
catch (NumberFormatException nfe)
{
// Bad position
}
}
}
import java.awt.*;
/**
* A canvas for drawing road maps and plotting
* vehicle tracks on them
*
* @version 1.0
* @author Prof. David Bernstein, James Madison University
*
*/
public class TrackingCanvas extends RoadMapCanvas
implements PositionListener
{
private Color vehicleColor;
/**
* Construct a new TrackingCanvas
*/
public TrackingCanvas()
{
super();
vehicleColor = new Color(255,0,255);
}
/**
* Handle a position report
*
* @position The position report
*/
public void handlePositionReport(Point position)
{
plotPosition(position);
}
/**
* Plots the position of a vehicle on the map
*
* @param p The position
*/
protected void plotPosition(Point p)
{
Graphics g;
g = getGraphics();
g.setColor(vehicleColor);
g.fillRect(p.x-2, p.y-2, 5, 5);
}
}
DatagramPacket that has an
appropriate IP address (i.e., starts
with 1110 so is in the range 224.0.0.0 -
239.255.255.255, but not below 224.0.0.255, all of which
are reserved for use by routing protocols)