--amend
[iotcloud.git] / src / java / iotcloud / Table.java
index c31aed86851d4e273eb950b0656f3223a54a7698..d3bac3dd40a913a6f71c47495c53e8174991d13a 100644 (file)
@@ -22,14 +22,15 @@ final public class Table {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
                numslots = buffer.capacity();
-               sequencenumber = 1;
+               sequencenumber = 0;
                initCloud(baseurl, password);
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
-               sequencenumber = 1;
+               numslots = buffer.capacity();
+               sequencenumber = 0;
                cloud=_cloud;
        }
        
@@ -60,8 +61,8 @@ final public class Table {
        }
 
        public void update() {
-               Slot[] newslots=cloud.getSlots(sequencenumber);
-               validateandupdate(newslots);
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+               validateandupdate(newslots, false);
        }
 
        public IoTString get(IoTString key) {
@@ -79,7 +80,7 @@ final public class Table {
     Slot[] array=cloud.putSlot(s, numslots);
     if (array == null) {
       array = new Slot[] {s};
-                       validateandupdate(array); // update data structure
+                       validateandupdate(array, true); // update data structure
                } else {
                        throw new Error("Error on initialization");
                }
@@ -89,7 +90,10 @@ final public class Table {
     while(true) {
       KeyValue oldvalue=table.get(key);
       if (tryput(key, value, false)) {
-        return oldvalue.getValue();
+                               if (oldvalue==null)
+                                       return null;
+                               else
+                                       return oldvalue.getValue();
       }
     }
   }
@@ -112,6 +116,8 @@ final public class Table {
           continue;
         Vector<Entry> liveentries = prevslot.getLiveEntries();
         for(Entry liveentry:liveentries) {
+                                       if (redundant(liveentry))
+                                               continue;
           if (s.hasSpace(liveentry))
             s.addEntry(liveentry);
           else if (i==0) {
@@ -139,6 +145,8 @@ final public class Table {
         continue;
       Vector<Entry> liveentries = prevslot.getLiveEntries();
       for(Entry liveentry:liveentries) {
+                               if (redundant(liveentry))
+                                       continue;
         if (s.hasSpace(liveentry))
           s.addEntry(liveentry);
         else
@@ -155,19 +163,28 @@ final public class Table {
     else
       insertedkv=false;
     
-               validateandupdate(array); // update data structure
+               validateandupdate(array, true); // update data structure
     
     return insertedkv;
        }
 
-       private void validateandupdate(Slot[] newslots) {
+       boolean redundant(Entry liveentry) {
+               if (liveentry.getType()==Entry.TypeLastMessage) {
+                       LastMessage lastmsg=(LastMessage) liveentry;
+                       return lastmsg.getMachineID() == localmachineid;
+               }
+               return false;
+       }
+
+       
+       private void validateandupdate(Slot[] newslots, boolean isput) {
                //The cloud communication layer has checked slot HMACs already
                //before decoding
                if (newslots.length==0)
                        return;
 
                long firstseqnum=newslots[0].getSequenceNumber();
-               if (firstseqnum < sequencenumber)
+               if (firstseqnum <= sequencenumber)
                        throw new Error("Server Error: Sent older slots!");
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
@@ -176,26 +193,32 @@ final public class Table {
     initExpectedSize();
     for(Slot slot: newslots) {
       updateExpectedSize();
-                       processSlot(indexer, slot);
+                       processSlot(indexer, slot, isput);
                }
-    checkNumSlots(newslots.length);
+
+               //If there is a gap, check to see if the server sent us everything
+               if (firstseqnum != (sequencenumber+1))
+                       checkNumSlots(newslots.length);
+               
     commitNewMaxSize();
 
     //commit new to slots
     for(Slot slot:newslots) {
       buffer.putSlot(slot);
     }
+               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
        }
 
   private int expectedsize, currmaxsize;
 
   private void checkNumSlots(int numslots) {
     if (numslots != expectedsize)
-      throw new Error("Server Error: Server did not send all slots");
+      throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
   }
   
   private void initExpectedSize() {
-    expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
+               long prevslots = sequencenumber;
+    expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
     currmaxsize = numslots;
   }
 
@@ -226,7 +249,7 @@ final public class Table {
        }
 
        private void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
        }
 
        private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@ -253,7 +276,7 @@ final public class Table {
     lastTableStatus = entry;
        }
 
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
@@ -269,7 +292,7 @@ final public class Table {
                }
 
                if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum)
+                       if (lastmsgseqnum != seqnum && !isput)
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
@@ -277,8 +300,8 @@ final public class Table {
                }
        }
 
-       private void processSlot(SlotIndexer indexer, Slot slot) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
     
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {