JMU
Customizing UDP Sockets
An Introduction with Examples in Java


Prof. David Bernstein
James Madison University

Computer Science Department
bernstdh@jmu.edu


Motivation
Possible Approaches
An Example
High Level Design Considerations
Low Level Design Alternatives for the Transmitter
Low Level Designs for the Transmitter (cont.)
Low Level Designs for the Transmitter (cont.)
Low Level Designs for the Transmitter (cont.)
Multithreading in the Transmitter
The Transmitter
javaexamples/esdsocket/ESDSendingSocket.java
        import java.io.*;
import java.net.*;
import java.util.concurrent.*;

/**
 * A socket that is used to send an expiring sequence of datagrams.
 * That is, a socket that is used to send a sequence of datagrams,
 * each of which expires when the next becomes available.
 *
 * @author  Prof. David Bernstein, James Madison University
 * @version 1.0
 */
public class ESDSendingSocket implements Runnable
{
    private  volatile boolean      isExecuting, keepWaitingForAcknowledgement;
    private  byte                  lastPacketNumber, packetNumber;
    private  DatagramPacket        lastPacket;
    private  DatagramSocket        ds;
    private  final ExecutorService threadPool;    
    private  Object                ackLock;    


    public  static final int  PAYLOAD_LENGTH          =  254;
    private static final int  DEFAULT_RECEIVE_TIMEOUT = 1000;


    /**
     * Default Constructor
     */
    public ESDSendingSocket() throws SocketException
    {
       ds = new DatagramSocket();
       ds.setSoTimeout(DEFAULT_RECEIVE_TIMEOUT);
        
       keepWaitingForAcknowledgement = false;

       lastPacketNumber = 1;
       packetNumber     = 1;

       ackLock = new Object();

       threadPool   = Executors.newCachedThreadPool();
    }

    /**
     * Close this socket
     */
    public void close()
    {
       // Stop the ACK thread
       keepWaitingForAcknowledgement = false;

       // Block until the ACK thread stops
       synchronized(ackLock)
       {
          // Shutdown the thread pool
          threadPool.shutdown();       
       }

       // Close the DatagramSocket
       ds.close();
    }

    /**
     * Wait for acknowledgements in another
     * thread of execution (required by Runnable)
     */
    public void run()
    {
       byte                ackNumber;
       byte[]              esdData;
       DatagramPacket      ack;

       // Setup
       ackNumber = 0;       
       esdData   = new byte[PAYLOAD_LENGTH+1];
       ack       = new DatagramPacket(esdData, esdData.length);

       synchronized(ackLock)
       {
          while (keepWaitingForAcknowledgement) 
          {
             try 
             {
                // Receive the ACK
                ds.receive(ack);

                // Get the packet number being ACKed
                esdData = ack.getData();
                ackNumber = (byte)((-1) * (int)(esdData[PAYLOAD_LENGTH]));

                // If it's the right ACK, stop waiting
                if (ackNumber == lastPacketNumber) 
                {
                   keepWaitingForAcknowledgement = false;
                }
             }
             catch (SocketTimeoutException ste)
             {
                try
                {
                   // Re-send the packet
                   ds.send(lastPacket);
                }
                catch (IOException ioe)
                {
                   // Try to receive() again
                }
             }
             catch (IOException ioe) 
             {
                // Try again
             }
          }
       }
       isExecuting = false;       
    }

    /**
     * Send a packet
     *
     * @param p    The packet to send
     */
    public void send(DatagramPacket p) throws IOException
    {
       byte[]           data, esdData;
       DatagramPacket   packet;


       // Construct a new array that is long enough
       // to handle the data and the packet number
       data = p.getData();
       esdData = new byte[PAYLOAD_LENGTH+1];

       // Copy the appropriate bytes into the new array
       System.arraycopy(data, p.getOffset(), 
                        esdData, 0, data.length);

       // Insert the packet number
       esdData[PAYLOAD_LENGTH] = packetNumber;


       // Construct the DatagramPacket
       packet = new DatagramPacket(esdData, esdData.length,
                                   p.getAddress(), p.getPort());

       // Stop the previous ACK thread (if there is one)
       if (isExecuting)
       {
          keepWaitingForAcknowledgement = false;

          // Block until the ACK thread stops
          synchronized(ackLock)
          {
             // Setup the next ACK thread
             keepWaitingForAcknowledgement = true;
             isExecuting = true;       
          }
       }

       // Create and start another ACK thread
       keepWaitingForAcknowledgement = true;
       isExecuting = true;       
       threadPool.execute(this);       

       // Send the packet for the first time
       lastPacket       = packet;
       lastPacketNumber = packetNumber;
       ds.send(packet);

       // Increment the packet number for the next send
       packetNumber++;
       if (packetNumber < 0) packetNumber = 1;
    }
}
        
Low Level Designs for the Receiver (cont.)
Low Level Design Alternatives for the Receiver
The Receiver
javaexamples/esdsocket/ESDReceivingSocket.java
        import java.io.*;
import java.net.*;

/**
 * A socket that is used to receive an expiring sequence of datagrams.
 * That is, a socket that is used to receive a sequence of
 * datagrams, each of which expires when the next becomes available.
 *
 * @author  Prof. David Bernstein, James Madison University
 * @version 1.0
 */
public class ESDReceivingSocket
{
    private DatagramSocket         ds;
    private int                    PAYLOAD_LENGTH;


    /**
     * Explicit Value Constructor
     *
     * @param port   The port to use
     */
    public ESDReceivingSocket(int port) throws SocketException
    {
        ds = new DatagramSocket(port);
        PAYLOAD_LENGTH = ESDSendingSocket.PAYLOAD_LENGTH;
    }

    /**
     * Close this socket
     */
    public void close()
    {
       ds.close();
    }

    /**
     * Receive a packet
     *
     * @param p   The DatagramPacket to fill
     */
    public void receive(DatagramPacket p) throws IOException
    {
        byte[]              data, esdData;
        DatagramPacket      packet;


        esdData = new byte[PAYLOAD_LENGTH+1];
        packet = new DatagramPacket(esdData, esdData.length);
        ds.receive(packet);
        esdData = packet.getData();

        // Copy the appropriate bytes into the new array
        data = new byte[PAYLOAD_LENGTH];
        System.arraycopy(esdData, p.getOffset(), 
                         data, 0, PAYLOAD_LENGTH);

        // Fill the datagram packet to be returned
        p.setAddress(packet.getAddress());
        p.setData(data, packet.getOffset(), PAYLOAD_LENGTH);
        p.setPort(packet.getPort());

        // An acknowledgement is required so negate 
        // the packet number and send it back
        esdData[esdData.length-1]=(byte)
                                  ((-1)*(int)(esdData[PAYLOAD_LENGTH]));

        ds.send(packet);
    }
}
        
Using a Factory

Appropriate when the Transmitter/Receiver are the Same

images/custom_datagram_sockets_java.gif