|
Customizing UDP Sockets
An Introduction with Examples in Java |
|
Prof. David Bernstein |
| Computer Science Department |
| bernstdh@jmu.edu |
DatagramSocket to both send and
receive but our sender and receiver will need
different capabilitiesDatagramSocket class are
inappropriate for this applicationsetSoTimeout()
receive() in an ackThread
send()
send() for re-sends in the
catch for the
SocketTimeoutException
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
/**
* A socket that is used to send an expiring sequence of datagrams.
* That is, a socket that is used to send a sequence of datagrams,
* each of which expires when the next becomes available.
*
* @author Prof. David Bernstein, James Madison University
* @version 1.0
*/
public class ESDSendingSocket implements Runnable
{
private volatile boolean isExecuting, keepWaitingForAcknowledgement;
private byte lastPacketNumber, packetNumber;
private DatagramPacket lastPacket;
private DatagramSocket ds;
private final ExecutorService threadPool;
private Object ackLock;
public static final int PAYLOAD_LENGTH = 254;
private static final int DEFAULT_RECEIVE_TIMEOUT = 1000;
/**
* Default Constructor
*/
public ESDSendingSocket() throws SocketException
{
ds = new DatagramSocket();
ds.setSoTimeout(DEFAULT_RECEIVE_TIMEOUT);
keepWaitingForAcknowledgement = false;
lastPacketNumber = 1;
packetNumber = 1;
ackLock = new Object();
threadPool = Executors.newCachedThreadPool();
}
/**
* Close this socket
*/
public void close()
{
// Stop the ACK thread
keepWaitingForAcknowledgement = false;
// Block until the ACK thread stops
synchronized(ackLock)
{
// Shutdown the thread pool
threadPool.shutdown();
}
// Close the DatagramSocket
ds.close();
}
/**
* Wait for acknowledgements in another
* thread of execution (required by Runnable)
*/
public void run()
{
byte ackNumber;
byte[] esdData;
DatagramPacket ack;
// Setup
ackNumber = 0;
esdData = new byte[PAYLOAD_LENGTH+1];
ack = new DatagramPacket(esdData, esdData.length);
synchronized(ackLock)
{
while (keepWaitingForAcknowledgement)
{
try
{
// Receive the ACK
ds.receive(ack);
// Get the packet number being ACKed
esdData = ack.getData();
ackNumber = (byte)((-1) * (int)(esdData[PAYLOAD_LENGTH]));
// If it's the right ACK, stop waiting
if (ackNumber == lastPacketNumber)
{
keepWaitingForAcknowledgement = false;
}
}
catch (SocketTimeoutException ste)
{
try
{
// Re-send the packet
ds.send(lastPacket);
}
catch (IOException ioe)
{
// Try to receive() again
}
}
catch (IOException ioe)
{
// Try again
}
}
}
isExecuting = false;
}
/**
* Send a packet
*
* @param p The packet to send
*/
public void send(DatagramPacket p) throws IOException
{
byte[] data, esdData;
DatagramPacket packet;
// Construct a new array that is long enough
// to handle the data and the packet number
data = p.getData();
esdData = new byte[PAYLOAD_LENGTH+1];
// Copy the appropriate bytes into the new array
System.arraycopy(data, p.getOffset(),
esdData, 0, data.length);
// Insert the packet number
esdData[PAYLOAD_LENGTH] = packetNumber;
// Construct the DatagramPacket
packet = new DatagramPacket(esdData, esdData.length,
p.getAddress(), p.getPort());
// Stop the previous ACK thread (if there is one)
if (isExecuting)
{
keepWaitingForAcknowledgement = false;
// Block until the ACK thread stops
synchronized(ackLock)
{
// Setup the next ACK thread
keepWaitingForAcknowledgement = true;
isExecuting = true;
}
}
// Create and start another ACK thread
keepWaitingForAcknowledgement = true;
isExecuting = true;
threadPool.execute(this);
// Send the packet for the first time
lastPacket = packet;
lastPacketNumber = packetNumber;
ds.send(packet);
// Increment the packet number for the next send
packetNumber++;
if (packetNumber < 0) packetNumber = 1;
}
}
receive()
send() after receive() returnsreceive() stops blockingreceive()
send() in an ackThread after
receive() returnsimport java.io.*;
import java.net.*;
/**
* A socket that is used to receive an expiring sequence of datagrams.
* That is, a socket that is used to receive a sequence of
* datagrams, each of which expires when the next becomes available.
*
* @author Prof. David Bernstein, James Madison University
* @version 1.0
*/
public class ESDReceivingSocket
{
private DatagramSocket ds;
private int PAYLOAD_LENGTH;
/**
* Explicit Value Constructor
*
* @param port The port to use
*/
public ESDReceivingSocket(int port) throws SocketException
{
ds = new DatagramSocket(port);
PAYLOAD_LENGTH = ESDSendingSocket.PAYLOAD_LENGTH;
}
/**
* Close this socket
*/
public void close()
{
ds.close();
}
/**
* Receive a packet
*
* @param p The DatagramPacket to fill
*/
public void receive(DatagramPacket p) throws IOException
{
byte[] data, esdData;
DatagramPacket packet;
esdData = new byte[PAYLOAD_LENGTH+1];
packet = new DatagramPacket(esdData, esdData.length);
ds.receive(packet);
esdData = packet.getData();
// Copy the appropriate bytes into the new array
data = new byte[PAYLOAD_LENGTH];
System.arraycopy(esdData, p.getOffset(),
data, 0, PAYLOAD_LENGTH);
// Fill the datagram packet to be returned
p.setAddress(packet.getAddress());
p.setData(data, packet.getOffset(), PAYLOAD_LENGTH);
p.setPort(packet.getPort());
// An acknowledgement is required so negate
// the packet number and send it back
esdData[esdData.length-1]=(byte)
((-1)*(int)(esdData[PAYLOAD_LENGTH]));
ds.send(packet);
}
}
Appropriate when the Transmitter/Receiver are the Same