Customizing UDP Sockets
An Introduction with Examples in Java |
Prof. David Bernstein |
Computer Science Department |
bernstdh@jmu.edu |
DatagramSocket
to both send and
receive but our sender and receiver will need
different capabilitiesDatagramSocket
class are
inappropriate for this applicationsetSoTimeout()
receive()
in an ackThread
send()
send()
for re-sends in the
catch
for the
SocketTimeoutException
import java.io.*; import java.net.*; import java.util.concurrent.*; /** * A socket that is used to send an expiring sequence of datagrams. * That is, a socket that is used to send a sequence of datagrams, * each of which expires when the next becomes available. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class ESDSendingSocket implements Runnable { private volatile boolean isExecuting, keepWaitingForAcknowledgement; private byte lastPacketNumber, packetNumber; private DatagramPacket lastPacket; private DatagramSocket ds; private final ExecutorService threadPool; private Object ackLock; public static final int PAYLOAD_LENGTH = 254; private static final int DEFAULT_RECEIVE_TIMEOUT = 1000; /** * Default Constructor */ public ESDSendingSocket() throws SocketException { ds = new DatagramSocket(); ds.setSoTimeout(DEFAULT_RECEIVE_TIMEOUT); keepWaitingForAcknowledgement = false; lastPacketNumber = 1; packetNumber = 1; ackLock = new Object(); threadPool = Executors.newCachedThreadPool(); } /** * Close this socket */ public void close() { // Stop the ACK thread keepWaitingForAcknowledgement = false; // Block until the ACK thread stops synchronized(ackLock) { // Shutdown the thread pool threadPool.shutdown(); } // Close the DatagramSocket ds.close(); } /** * Wait for acknowledgements in another * thread of execution (required by Runnable) */ public void run() { byte ackNumber; byte[] esdData; DatagramPacket ack; // Setup ackNumber = 0; esdData = new byte[PAYLOAD_LENGTH+1]; ack = new DatagramPacket(esdData, esdData.length); synchronized(ackLock) { while (keepWaitingForAcknowledgement) { try { // Receive the ACK ds.receive(ack); // Get the packet number being ACKed esdData = ack.getData(); ackNumber = (byte)((-1) * (int)(esdData[PAYLOAD_LENGTH])); // If it's the right ACK, stop waiting if (ackNumber == lastPacketNumber) { keepWaitingForAcknowledgement = false; } } catch (SocketTimeoutException ste) { try { // Re-send the packet ds.send(lastPacket); } catch (IOException ioe) { // Try to receive() again } } catch (IOException ioe) { // Try again } } } isExecuting = false; } /** * Send a packet * * @param p The packet to send */ public void send(DatagramPacket p) throws IOException { byte[] data, esdData; DatagramPacket packet; // Construct a new array that is long enough // to handle the data and the packet number data = p.getData(); esdData = new byte[PAYLOAD_LENGTH+1]; // Copy the appropriate bytes into the new array System.arraycopy(data, p.getOffset(), esdData, 0, data.length); // Insert the packet number esdData[PAYLOAD_LENGTH] = packetNumber; // Construct the DatagramPacket packet = new DatagramPacket(esdData, esdData.length, p.getAddress(), p.getPort()); // Stop the previous ACK thread (if there is one) if (isExecuting) { keepWaitingForAcknowledgement = false; // Block until the ACK thread stops synchronized(ackLock) { // Setup the next ACK thread keepWaitingForAcknowledgement = true; isExecuting = true; } } // Create and start another ACK thread keepWaitingForAcknowledgement = true; isExecuting = true; threadPool.execute(this); // Send the packet for the first time lastPacket = packet; lastPacketNumber = packetNumber; ds.send(packet); // Increment the packet number for the next send packetNumber++; if (packetNumber < 0) packetNumber = 1; } }
receive()
send()
after receive()
returnsreceive()
stops blockingreceive()
send()
in an ackThread
after
receive()
returnsimport java.io.*; import java.net.*; /** * A socket that is used to receive an expiring sequence of datagrams. * That is, a socket that is used to receive a sequence of * datagrams, each of which expires when the next becomes available. * * @author Prof. David Bernstein, James Madison University * @version 1.0 */ public class ESDReceivingSocket { private DatagramSocket ds; private int PAYLOAD_LENGTH; /** * Explicit Value Constructor * * @param port The port to use */ public ESDReceivingSocket(int port) throws SocketException { ds = new DatagramSocket(port); PAYLOAD_LENGTH = ESDSendingSocket.PAYLOAD_LENGTH; } /** * Close this socket */ public void close() { ds.close(); } /** * Receive a packet * * @param p The DatagramPacket to fill */ public void receive(DatagramPacket p) throws IOException { byte[] data, esdData; DatagramPacket packet; esdData = new byte[PAYLOAD_LENGTH+1]; packet = new DatagramPacket(esdData, esdData.length); ds.receive(packet); esdData = packet.getData(); // Copy the appropriate bytes into the new array data = new byte[PAYLOAD_LENGTH]; System.arraycopy(esdData, p.getOffset(), data, 0, PAYLOAD_LENGTH); // Fill the datagram packet to be returned p.setAddress(packet.getAddress()); p.setData(data, packet.getOffset(), PAYLOAD_LENGTH); p.setPort(packet.getPort()); // An acknowledgement is required so negate // the packet number and send it back esdData[esdData.length-1]=(byte) ((-1)*(int)(esdData[PAYLOAD_LENGTH])); ds.send(packet); } }
Appropriate when the Transmitter/Receiver are the Same