--amend
[iotcloud.git] / src / java / iotcloud / Table.java
index 6cd2b323d9cf6a07789f2b8dcd9da318ad7da73e..d3bac3dd40a913a6f71c47495c53e8174991d13a 100644 (file)
@@ -1,26 +1,39 @@
 package iotcloud;
 import java.util.HashMap;
 import java.util.Arrays;
+import java.util.Vector;
 import javax.crypto.spec.*;
 import javax.crypto.*;
 
-public class Table {
-       int numslots;
-       HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
-       HashMap<Long, Pair<Long, Liveness>> lastmessagetable=new HashMap<Long, Pair<Long, Liveness>>();
-       SlotBuffer buffer;
-       CloudComm cloud;
+final public class Table {
+       private int numslots;
+       private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+       private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private SlotBuffer buffer;
+       private CloudComm cloud;
        private Mac hmac;
-       long sequencenumber;
-       long localmachineid;
-       
+       private long sequencenumber;
+       private long localmachineid;
+  private TableStatus lastTableStatus;
+  static final int FREE_SLOTS = 10;
+  static final int FORCED_RESIZE_INCREMENT = 20;
+  
        public Table(String baseurl, String password, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
-               sequencenumber = 1;
+               numslots = buffer.capacity();
+               sequencenumber = 0;
                initCloud(baseurl, password);
        }
 
+       public Table(CloudComm _cloud, long _localmachineid) {
+               localmachineid=_localmachineid;
+               buffer = new SlotBuffer();
+               numslots = buffer.capacity();
+               sequencenumber = 0;
+               cloud=_cloud;
+       }
+       
        private void initCloud(String baseurl, String password) {
                try {
                        SecretKeySpec secret=getKey(password);
@@ -48,8 +61,8 @@ public class Table {
        }
 
        public void update() {
-               Slot[] newslots=cloud.getSlots(sequencenumber);
-               validateandupdate(newslots);
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+               validateandupdate(newslots, false);
        }
 
        public IoTString get(IoTString key) {
@@ -60,47 +73,191 @@ public class Table {
                        return null;
        }
 
+       public void initTable() {
+               Slot s=new Slot(1, localmachineid);
+               TableStatus status=new TableStatus(s, numslots);
+               s.addEntry(status);
+    Slot[] array=cloud.putSlot(s, numslots);
+    if (array == null) {
+      array = new Slot[] {s};
+                       validateandupdate(array, true); // update data structure
+               } else {
+                       throw new Error("Error on initialization");
+               }
+       }
+       
        public IoTString put(IoTString key, IoTString value) {
-               return null;
+    while(true) {
+      KeyValue oldvalue=table.get(key);
+      if (tryput(key, value, false)) {
+                               if (oldvalue==null)
+                                       return null;
+                               else
+                                       return oldvalue.getValue();
+      }
+    }
+  }
+
+  private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
+               Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+    long seqn = buffer.getOldestSeqNum();
+
+               if (forcedresize) {
+                       TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+                       s.addEntry(status);
+               }
+               
+    if ((numslots - buffer.size()) < FREE_SLOTS) {
+      //have to check whether we have enough free slots
+      seqn = buffer.getNewestSeqNum() + 1 - numslots;
+      for(int i=0; i < FREE_SLOTS; i++, seqn++) {
+        Slot prevslot=buffer.getSlot(seqn);
+        if (!prevslot.isLive())
+          continue;
+        Vector<Entry> liveentries = prevslot.getLiveEntries();
+        for(Entry liveentry:liveentries) {
+                                       if (redundant(liveentry))
+                                               continue;
+          if (s.hasSpace(liveentry))
+            s.addEntry(liveentry);
+          else if (i==0) {
+            if (s.canFit(liveentry))
+              s.addEntry(liveentry);
+            else if (!forcedresize) {
+              return tryput(key, value, true);
+                                               }
+                                       }
+        }
+      }
+    }
+    KeyValue kv=new KeyValue(s, key, value);
+    boolean insertedkv=false;
+    if (s.hasSpace(kv)) {
+      s.addEntry(kv);
+      insertedkv=true;
+    }
+
+    long newestseqnum=buffer.getNewestSeqNum();
+    search:
+    for(;seqn<=newestseqnum;seqn++) {
+      Slot prevslot=buffer.getSlot(seqn);
+      if (!prevslot.isLive())
+        continue;
+      Vector<Entry> liveentries = prevslot.getLiveEntries();
+      for(Entry liveentry:liveentries) {
+                               if (redundant(liveentry))
+                                       continue;
+        if (s.hasSpace(liveentry))
+          s.addEntry(liveentry);
+        else
+          break search;
+      }
+    }
+    
+    int max=0;
+    if (forcedresize)
+      max = numslots + FORCED_RESIZE_INCREMENT;
+    Slot[] array=cloud.putSlot(s, max);
+    if (array == null)
+      array = new Slot[] {s};
+    else
+      insertedkv=false;
+    
+               validateandupdate(array, true); // update data structure
+    
+    return insertedkv;
        }
+
+       boolean redundant(Entry liveentry) {
+               if (liveentry.getType()==Entry.TypeLastMessage) {
+                       LastMessage lastmsg=(LastMessage) liveentry;
+                       return lastmsg.getMachineID() == localmachineid;
+               }
+               return false;
+       }
+
        
-       void validateandupdate(Slot[] newslots) {
+       private void validateandupdate(Slot[] newslots, boolean isput) {
                //The cloud communication layer has checked slot HMACs already
                //before decoding
                if (newslots.length==0)
                        return;
 
                long firstseqnum=newslots[0].getSequenceNumber();
-               if (firstseqnum < sequencenumber)
+               if (firstseqnum <= sequencenumber)
                        throw new Error("Server Error: Sent older slots!");
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
-               for(Slot slot: newslots) {
-                       processSlot(indexer, slot);
+
+    initExpectedSize();
+    for(Slot slot: newslots) {
+      updateExpectedSize();
+                       processSlot(indexer, slot, isput);
                }
+
+               //If there is a gap, check to see if the server sent us everything
+               if (firstseqnum != (sequencenumber+1))
+                       checkNumSlots(newslots.length);
                
+    commitNewMaxSize();
+
+    //commit new to slots
+    for(Slot slot:newslots) {
+      buffer.putSlot(slot);
+    }
+               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
        }
 
-       void processEntry(KeyValue entry, SlotIndexer indexer) {
+  private int expectedsize, currmaxsize;
+
+  private void checkNumSlots(int numslots) {
+    if (numslots != expectedsize)
+      throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
+  }
+  
+  private void initExpectedSize() {
+               long prevslots = sequencenumber;
+    expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
+    currmaxsize = numslots;
+  }
+
+  private void updateExpectedSize() {
+    expectedsize++;
+    if (expectedsize > currmaxsize)
+      expectedsize = currmaxsize;
+  }
+
+  private void updateCurrMaxSize(int newmaxsize) {
+    currmaxsize=newmaxsize;
+  }
+
+  private void commitNewMaxSize() {
+    if (numslots != currmaxsize)
+      buffer.resize(currmaxsize);
+
+    numslots=currmaxsize;
+  }
+  
+       private void processEntry(KeyValue entry, SlotIndexer indexer) {
                IoTString key=entry.getKey();
                KeyValue oldvalue=table.get(key);
                if (oldvalue != null) {
-                       oldvalue.decrementLiveCount();
+                       oldvalue.setDead();
                }
                table.put(key, entry);
        }
 
-       void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+       private void processEntry(LastMessage entry, SlotIndexer indexer) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
        }
 
-       void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+       private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
                long oldseqnum=entry.getOldSeqNum();
                long newseqnum=entry.getNewSeqNum();
                boolean isequal=entry.getEqual();
                long machineid=entry.getMachineID();
-               for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) {
+               for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
                        Slot slot=indexer.getSlot(seqnum);
                        if (slot != null) {
                                long slotmachineid=slot.getMachineID();
@@ -110,12 +267,16 @@ public class Table {
                        }
                }
        }
-       
-       void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-               
+
+       private void processEntry(TableStatus entry, SlotIndexer indexer) {
+    int newnumslots=entry.getMaxSlots();
+    updateCurrMaxSize(newnumslots);
+    if (lastTableStatus != null)
+      lastTableStatus.setDead();
+    lastTableStatus = entry;
        }
 
-       void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
@@ -123,47 +284,50 @@ public class Table {
                long lastmsgseqnum = lastmsgentry.getFirst();
                Liveness lastentry = lastmsgentry.getSecond();
                if (lastentry instanceof LastMessage) {
-                       ((LastMessage)lastentry).decrementLiveCount();
+                       ((LastMessage)lastentry).setDead();
                } else if (lastentry instanceof Slot) {
-                       ((Slot)lastentry).decrementLiveCount();
+                       ((Slot)lastentry).setDead();
                } else {
                        throw new Error("Unrecognized type");
                }
-               
-               //Check that nothing funny happened
+
                if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum)
+                       if (lastmsgseqnum != seqnum && !isput)
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
-                               throw new Error("Server Error: Rolback on remote machine sequence number");
+                               throw new Error("Server Error: Rollback on remote machine sequence number");
                }
        }
-       
-       void processSlot(SlotIndexer indexer, Slot slot) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
 
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
+    
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
                                processEntry((KeyValue)entry, indexer);
                                break;
+
                        case Entry.TypeLastMessage:
                                processEntry((LastMessage)entry, indexer);
                                break;
+
                        case Entry.TypeRejectedMessage:
                                processEntry((RejectedMessage)entry, indexer);
                                break;
+
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry, indexer, slot);
+                               processEntry((TableStatus)entry, indexer);
                                break;
+
                        default:
                                throw new Error("Unrecognized type: "+entry.getType());
                        }
                }
        }
-       
-       void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+
+       private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
                for(int i=0; i < newslots.length; i++) {
                        Slot currslot=newslots[i];
                        Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);