import java.io.*;
import java.net.*;
import java.util.concurrent.*;

/**
 * A socket that is used to send an expiring sequence of datagrams.
 * That is, a socket that is used to send a sequence of datagrams,
 * each of which expires when the next becomes available.
 *
 * @author  Prof. David Bernstein, James Madison University
 * @version 1.0
 */
public class ESDSendingSocket implements Runnable
{
    private  volatile boolean      isExecuting, keepWaitingForAcknowledgement;
    private  byte                  lastPacketNumber, packetNumber;
    private  DatagramPacket        lastPacket;
    private  DatagramSocket        ds;
    private  final ExecutorService threadPool;    
    private  Object                ackLock;    


    public  static final int  PAYLOAD_LENGTH          =  254;
    private static final int  DEFAULT_RECEIVE_TIMEOUT = 1000;


    /**
     * Default Constructor
     */
    public ESDSendingSocket() throws SocketException
    {
       ds = new DatagramSocket();
       ds.setSoTimeout(DEFAULT_RECEIVE_TIMEOUT);
	
       keepWaitingForAcknowledgement = false;

       lastPacketNumber = 1;
       packetNumber     = 1;

       ackLock = new Object();

       threadPool   = Executors.newCachedThreadPool();
    }

    /**
     * Close this socket
     */
    public void close()
    {
       // Stop the ACK thread
       keepWaitingForAcknowledgement = false;

       // Block until the ACK thread stops
       synchronized(ackLock)
       {
          // Shutdown the thread pool
          threadPool.shutdown();       
       }

       // Close the DatagramSocket
       ds.close();
    }

    /**
     * Wait for acknowledgements in another
     * thread of execution (required by Runnable)
     */
    public void run()
    {
       byte                ackNumber;
       byte[]              esdData;
       DatagramPacket      ack;

       // Setup
       ackNumber = 0;       
       esdData   = new byte[PAYLOAD_LENGTH+1];
       ack       = new DatagramPacket(esdData, esdData.length);

       synchronized(ackLock)
       {
          while (keepWaitingForAcknowledgement) 
          {
             try 
             {
                // Receive the ACK
                ds.receive(ack);

		// Get the packet number being ACKed
                esdData = ack.getData();
                ackNumber = (byte)((-1) * (int)(esdData[PAYLOAD_LENGTH]));

                // If it's the right ACK, stop waiting
                if (ackNumber == lastPacketNumber) 
                {
                   keepWaitingForAcknowledgement = false;
                }
             }
             catch (SocketTimeoutException ste)
             {
                try
                {
                   // Re-send the packet
                   ds.send(lastPacket);
                }
                catch (IOException ioe)
                {
                   // Try to receive() again
                }
             }
             catch (IOException ioe) 
             {
                // Try again
             }
          }
       }
       isExecuting = false;       
    }

    /**
     * Send a packet
     *
     * @param p    The packet to send
     */
    public void send(DatagramPacket p) throws IOException
    {
       byte[]           data, esdData;
       DatagramPacket   packet;


       // Construct a new array that is long enough
       // to handle the data and the packet number
       data = p.getData();
       esdData = new byte[PAYLOAD_LENGTH+1];

       // Copy the appropriate bytes into the new array
       System.arraycopy(data, p.getOffset(), 
                        esdData, 0, data.length);

       // Insert the packet number
       esdData[PAYLOAD_LENGTH] = packetNumber;


       // Construct the DatagramPacket
       packet = new DatagramPacket(esdData, esdData.length,
                                   p.getAddress(), p.getPort());

       // Stop the previous ACK thread (if there is one)
       if (isExecuting)
       {
          keepWaitingForAcknowledgement = false;

          // Block until the ACK thread stops
          synchronized(ackLock)
          {
             // Setup the next ACK thread
             keepWaitingForAcknowledgement = true;
             isExecuting = true;       
          }
       }

       // Create and start another ACK thread
       keepWaitingForAcknowledgement = true;
       isExecuting = true;       
       threadPool.execute(this);       

       // Send the packet for the first time
       lastPacket       = packet;
       lastPacketNumber = packetNumber;
       ds.send(packet);

       // Increment the packet number for the next send
       packetNumber++;
       if (packetNumber < 0) packetNumber = 1;
    }
}
