more changes
[iotcloud.git] / src / java / iotcloud / Table.java
index 00680abeb51b8dea56e81506a243e6c35a1eb36a..d4da5db5d78aed45aa8d2610b50b40a89ce0ec8e 100644 (file)
@@ -4,16 +4,18 @@ import java.util.Arrays;
 import javax.crypto.spec.*;
 import javax.crypto.*;
 
-public class Table {
-       int numslots;
-       HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
-       HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
-       SlotBuffer buffer;
-       CloudComm cloud;
+final public class Table {
+       private int numslots;
+       private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+       private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private SlotBuffer buffer;
+       private CloudComm cloud;
        private Mac hmac;
-       long sequencenumber;
-       long localmachineid;
-
+       private long sequencenumber;
+       private long localmachineid;
+  private TableStatus lastTableStatus;
+  static final int FREE_SLOTS = 10;
+  
        public Table(String baseurl, String password, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
@@ -61,10 +63,24 @@ public class Table {
        }
 
        public IoTString put(IoTString key, IoTString value) {
-               return null;
+    Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
+    if ((numslots - buffer.size()) < FREE_SLOTS) {
+      //have to check whether we have enough free slots
+      long seqn = buffer.getNewestSeqNum() + 1 - numslots;
+      for(int i=0; i < FREE_SLOTS; i++, seqn--) {
+        Slot prevslot=buffer.getSlot(seqn);
+        if (!prevslot.isLive())
+          continue;
+        
+      }
+    }
+
+
+    return null;
        }
 
-       void validateandupdate(Slot[] newslots) {
+       private void validateandupdate(Slot[] newslots) {
                //The cloud communication layer has checked slot HMACs already
                //before decoding
                if (newslots.length==0)
@@ -76,26 +92,64 @@ public class Table {
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
-               for(Slot slot: newslots) {
+
+    initExpectedSize();
+    for(Slot slot: newslots) {
+      updateExpectedSize();
                        processSlot(indexer, slot);
                }
+    checkNumSlots(newslots.length);
+    commitNewMaxSize();
 
+    //commit new to slots
+    for(Slot slot:newslots) {
+      buffer.putSlot(slot);
+    }
        }
 
-       void processEntry(KeyValue entry, SlotIndexer indexer) {
+  private int expectedsize, currmaxsize;
+
+  private void checkNumSlots(int numslots) {
+    if (numslots != expectedsize)
+      throw new Error("Server Error: Server did not send all slots");
+  }
+  
+  private void initExpectedSize() {
+    expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
+    currmaxsize = numslots;
+  }
+
+  private void updateExpectedSize() {
+    expectedsize++;
+    if (expectedsize > currmaxsize)
+      expectedsize = currmaxsize;
+  }
+
+  private void updateCurrMaxSize(int newmaxsize) {
+    currmaxsize=newmaxsize;
+  }
+
+  private void commitNewMaxSize() {
+    if (numslots != currmaxsize)
+      buffer.resize(currmaxsize);
+
+    numslots=currmaxsize;
+  }
+  
+       private void processEntry(KeyValue entry, SlotIndexer indexer) {
                IoTString key=entry.getKey();
                KeyValue oldvalue=table.get(key);
                if (oldvalue != null) {
-                       oldvalue.decrementLiveCount();
+                       oldvalue.setDead();
                }
                table.put(key, entry);
        }
 
-       void processEntry(LastMessage entry, SlotIndexer indexer) {
+       private void processEntry(LastMessage entry, SlotIndexer indexer) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
        }
 
-       void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+       private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
                long oldseqnum=entry.getOldSeqNum();
                long newseqnum=entry.getNewSeqNum();
                boolean isequal=entry.getEqual();
@@ -111,11 +165,15 @@ public class Table {
                }
        }
 
-       void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-
+       private void processEntry(TableStatus entry, SlotIndexer indexer) {
+    int newnumslots=entry.getMaxSlots();
+    updateCurrMaxSize(newnumslots);
+    if (lastTableStatus != null)
+      lastTableStatus.setDead();
+    lastTableStatus = entry;
        }
 
-       void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
@@ -123,9 +181,9 @@ public class Table {
                long lastmsgseqnum = lastmsgentry.getFirst();
                Liveness lastentry = lastmsgentry.getSecond();
                if (lastentry instanceof LastMessage) {
-                       ((LastMessage)lastentry).decrementLiveCount();
+                       ((LastMessage)lastentry).setDead();
                } else if (lastentry instanceof Slot) {
-                       ((Slot)lastentry).decrementLiveCount();
+                       ((Slot)lastentry).setDead();
                } else {
                        throw new Error("Unrecognized type");
                }
@@ -135,13 +193,13 @@ public class Table {
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
-                               throw new Error("Server Error: Rolback on remote machine sequence number");
+                               throw new Error("Server Error: Rollback on remote machine sequence number");
                }
        }
 
-       void processSlot(SlotIndexer indexer, Slot slot) {
+       private void processSlot(SlotIndexer indexer, Slot slot) {
                updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
-
+    
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
@@ -157,7 +215,7 @@ public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry, indexer, slot);
+                               processEntry((TableStatus)entry, indexer);
                                break;
 
                        default:
@@ -166,7 +224,7 @@ public class Table {
                }
        }
 
-       void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+       private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
                for(int i=0; i < newslots.length; i++) {
                        Slot currslot=newslots[i];
                        Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);