Commented Java code
[iotcloud.git] / src / java / iotcloud / Table.java
index 91ddacaa3c67df88ae25846496b2e2a55993b80e..1d6259a2a31649c680fe2b401e441ade86529493 100644 (file)
@@ -14,23 +14,30 @@ import java.util.Random;
  */
 
 final public class Table {
-       private int numslots;
+       private int numslots;   //number of slots stored in buffer
+
+       //table of key-value pairs
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+
+       // machine id -> (sequence number, Slot or LastMessage); records last message by each client
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       // machine id -> ...
        private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+       private Vector<Long> rejectedmessagelist=new Vector<Long>();
        private SlotBuffer buffer;
        private CloudComm cloud;
-       private long sequencenumber;
+       private long sequencenumber; //Largest sequence number a client has received
        private long localmachineid;
        private TableStatus lastTableStatus;
-       static final int FREE_SLOTS = 10;
+       static final int FREE_SLOTS = 10; //number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
        private long liveslotcount=0;
        private int chance;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
+       static final int REJECTED_THRESHOLD = 5;
        private int resizethreshold;
-       private long lastliveslotseqn;
+       private long lastliveslotseqn;  //smallest sequence number with a live entry
        private Random random=new Random();
        
        public Table(String baseurl, String password, long _localmachineid) {
@@ -106,6 +113,7 @@ final public class Table {
                liveslotcount--;
        }
        
+       
        private void setResizeThreshold() {
                int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
                resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
@@ -114,10 +122,8 @@ final public class Table {
        private boolean tryput(IoTString key, IoTString value, boolean resize) {
                Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
                int newsize = 0;
-               if (liveslotcount > resizethreshold) {
-                       System.out.print("A");
-                       resize=true;
-                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+               if (liveslotcount > resizethreshold) { 
+                       resize=true; //Resize is forced
                }
                
                if (resize) {
@@ -126,6 +132,43 @@ final public class Table {
                        s.addEntry(status);
                }
 
+               if (! rejectedmessagelist.isEmpty()) {
+                       /* TODO: We should avoid generating a rejected message entry if
+                        * there is already a sufficient entry in the queue (e.g.,
+                        * equalsto value of true and same sequence number).  */
+
+                       long old_seqn=rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn=rejectedmessagelist.lastElement();
+                               RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn = -1;
+                               int i=0;
+                               /* Go through list of missing messages */
+                               for(; i<rejectedmessagelist.size(); i++) {
+                                       long curr_seqn = rejectedmessagelist.get(i);
+                                       Slot s_msg = buffer.getSlot(curr_seqn);
+                                       if (s_msg != null)
+                                               break;
+                                       prev_seqn=curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for(; i<rejectedmessagelist.size(); i++) {
+                                       long curr_seqn=rejectedmessagelist.get(i);
+                                       Slot s_msg=buffer.getSlot(curr_seqn);
+                                       long machineid=s_msg.getMachineID();
+                                       RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+               
                long newestseqnum = buffer.getNewestSeqNum();
                long oldestseqnum = buffer.getOldestSeqNum();
                if (lastliveslotseqn < oldestseqnum)
@@ -133,25 +176,25 @@ final public class Table {
 
                long seqn = lastliveslotseqn;
                boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots;
-               long threshold = firstiffull + FREE_SLOTS;
+               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
                
                for(; seqn < threshold; seqn++) {
                        Slot prevslot=buffer.getSlot(seqn);
                        //Push slot number forward
-                       if (!seenliveslot)
+                       if (! seenliveslot)
                                lastliveslotseqn = seqn;
 
-                       if (!prevslot.isLive())
+                       if (! prevslot.isLive())
                                continue;
                        seenliveslot = true;
                        Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
                        for(Entry liveentry:liveentries) {
                                if (s.hasSpace(liveentry)) {
                                        s.addEntry(liveentry);
-                               } else if (seqn==firstiffull) {
+                               } else if (seqn==firstiffull) {   //if there's no space but the entry is about to fall off the queue
                                        if (!resize) {
-                                               System.out.print("B");
+                                               System.out.print("B"); //?
                                                return tryput(key, value, true);
                                        }
                                }
@@ -165,6 +208,9 @@ final public class Table {
                        insertedkv=true;
                }
 
+               /* now go through live entries from least to greatest sequence number until
+                * either all live slots added, or the slot doesn't have enough room
+                * for SKIP_THRESHOLD consecutive entries*/
                int skipcount=0;
                search:
                for(; seqn <= newestseqnum; seqn++) {
@@ -192,11 +238,16 @@ final public class Table {
                if (resize)
                        max = newsize;
                Slot[] array=cloud.putSlot(s, max);
-               if (array == null)
+               if (array == null) {
                        array = new Slot[] {s};
-               else
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
                        insertedkv=false;
-
+               }
+               
                /* update data structure */
                validateandupdate(array, true);
 
@@ -206,8 +257,7 @@ final public class Table {
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
                /* The cloud communication layer has checked slot HMACs already
                         before decoding */
-               if (newslots.length==0)
-                       return;
+               if (newslots.length==0) return;
 
                long firstseqnum=newslots[0].getSequenceNumber();
                if (firstseqnum <= sequencenumber)
@@ -216,7 +266,7 @@ final public class Table {
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
-               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());  //
 
                initExpectedSize(firstseqnum);
                for(Slot slot: newslots) {
@@ -250,7 +300,7 @@ final public class Table {
 
        private void initExpectedSize(long firstsequencenumber) {
                long prevslots = firstsequencenumber;
-               expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+               expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
                currmaxsize = numslots;
        }
 
@@ -290,18 +340,18 @@ final public class Table {
                long newseqnum=entry.getNewSeqNum();
                boolean isequal=entry.getEqual();
                long machineid=entry.getMachineID();
-               for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
+               for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
                        Slot slot=indexer.getSlot(seqnum);
                        if (slot != null) {
                                long slotmachineid=slot.getMachineID();
-                               if (isequal!=(slotmachineid==machineid)) {
+                               if (isequal != (slotmachineid==machineid)) {
                                        throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
                                }
                        }
                }
 
                HashSet<Long> watchset=new HashSet<Long>();
-               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
                        long entry_mid=lastmsg_entry.getKey();
                        /* We've seen it, don't need to continue to watch.  Our next
                         * message will implicitly acknowledge it. */