simplify code
[iotcloud.git] / src / java / iotcloud / Table.java
index c31aed86851d4e273eb950b0656f3223a54a7698..bbcfe6a5e0fb759cf01366c269009187af9e3e0c 100644 (file)
@@ -1,67 +1,68 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
-import javax.crypto.spec.*;
-import javax.crypto.*;
+import java.util.Random;
+
+/**
+ * IoTTable data structure.  Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
 
 final public class Table {
        private int numslots;
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+       private Vector<Long> rejectedmessagelist=new Vector<Long>();
        private SlotBuffer buffer;
        private CloudComm cloud;
-       private Mac hmac;
        private long sequencenumber;
        private long localmachineid;
-  private TableStatus lastTableStatus;
-  static final int FREE_SLOTS = 10;
-  static final int FORCED_RESIZE_INCREMENT = 20;
-  
+       private TableStatus lastTableStatus;
+       static final int FREE_SLOTS = 10;
+       static final int SKIP_THRESHOLD = 10;
+       private long liveslotcount=0;
+       private int chance;
+       static final double RESIZE_MULTIPLE = 1.2;
+       static final double RESIZE_THRESHOLD = 0.75;
+       static final int REJECTED_THRESHOLD = 5;
+       private int resizethreshold;
+       private long lastliveslotseqn;
+       private Random random=new Random();
+       
        public Table(String baseurl, String password, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
                numslots = buffer.capacity();
-               sequencenumber = 1;
-               initCloud(baseurl, password);
+               setResizeThreshold();
+               sequencenumber = 0;
+               cloud=new CloudComm(this, baseurl, password);
+               lastliveslotseqn = 1;
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
                localmachineid=_localmachineid;
                buffer = new SlotBuffer();
-               sequencenumber = 1;
+               numslots = buffer.capacity();
+               setResizeThreshold();
+               sequencenumber = 0;
                cloud=_cloud;
        }
-       
-       private void initCloud(String baseurl, String password) {
-               try {
-                       SecretKeySpec secret=getKey(password);
-                       Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                       encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
-                       Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
-                       decryptCipher.init(Cipher.DECRYPT_MODE, secret);
-                       hmac = Mac.getInstance("HmacSHA256");
-                       hmac.init(secret);
-                       cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
-               } catch (Exception e) {
-                       throw new Error("Failed To Initialize Ciphers");
-               }
-       }
 
-       private SecretKeySpec getKey(String password) {
-               try {
-                       PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
-                       SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
-                       SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
-                       return secret;
-               } catch (Exception e) {
-                       throw new Error("Failed generating key.");
-               }
+       public void rebuild() {
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+               validateandupdate(newslots, true);
        }
 
        public void update() {
-               Slot[] newslots=cloud.getSlots(sequencenumber);
-               validateandupdate(newslots);
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+
+               validateandupdate(newslots, false);
        }
 
        public IoTString get(IoTString key) {
@@ -73,149 +74,243 @@ final public class Table {
        }
 
        public void initTable() {
-               Slot s=new Slot(1, localmachineid);
+               cloud.setSalt();//Set the salt
+               Slot s=new Slot(this, 1, localmachineid);
                TableStatus status=new TableStatus(s, numslots);
                s.addEntry(status);
-    Slot[] array=cloud.putSlot(s, numslots);
-    if (array == null) {
-      array = new Slot[] {s};
-                       validateandupdate(array); // update data structure
+               Slot[] array=cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       /* update data structure */
+                       validateandupdate(array, true);
                } else {
                        throw new Error("Error on initialization");
                }
        }
+
+       public String toString() {
+               return table.toString();
+       }
        
        public IoTString put(IoTString key, IoTString value) {
-    while(true) {
-      KeyValue oldvalue=table.get(key);
-      if (tryput(key, value, false)) {
-        return oldvalue.getValue();
-      }
-    }
-  }
-
-  private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
-               Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-    long seqn = buffer.getOldestSeqNum();
-
-               if (forcedresize) {
-                       TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+               while(true) {
+                       KeyValue oldvalue=table.get(key);
+                       if (tryput(key, value, false)) {
+                               if (oldvalue==null)
+                                       return null;
+                               else
+                                       return oldvalue.getValue();
+                       }
+               }
+       }
+
+       void decrementLiveCount() {
+               liveslotcount--;
+       }
+       
+       private void setResizeThreshold() {
+               int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
+               resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+       }
+       
+       private boolean tryput(IoTString key, IoTString value, boolean resize) {
+               Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+               int newsize = 0;
+               if (liveslotcount > resizethreshold) {
+                       resize=true;
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+               }
+               
+               if (resize) {
+                       newsize = (int) (numslots * RESIZE_MULTIPLE);
+                       TableStatus status=new TableStatus(s, newsize);
                        s.addEntry(status);
                }
+
+               if (!rejectedmessagelist.isEmpty()) {
+                       long old_seqn=rejectedmessagelist.firstElement();
+                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+                               long new_seqn=rejectedmessagelist.lastElement();
+                               RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+                               s.addEntry(rm);
+                       } else {
+                               long prev_seqn=-1;
+                               int i=0;
+                               /* Go through list of missing messages */
+                               for(;i<rejectedmessagelist.size();i++) {
+                                       long curr_seqn=rejectedmessagelist.get(i);
+                                       Slot s_msg=buffer.getSlot(curr_seqn);
+                                       if (s_msg!=null)
+                                               break;
+                                       prev_seqn=curr_seqn;
+                               }
+                               /* Generate rejected message entry for missing messages */
+                               if (prev_seqn != -1) {
+                                       RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+                                       s.addEntry(rm);
+                               }
+                               /* Generate rejected message entries for present messages */
+                               for(;i<rejectedmessagelist.size();i++) {
+                                       long curr_seqn=rejectedmessagelist.get(i);
+                                       Slot s_msg=buffer.getSlot(curr_seqn);
+                                       long machineid=s_msg.getMachineID();
+                                       RejectedMessage rm=new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       s.addEntry(rm);
+                               }
+                       }
+               }
+               
+               long newestseqnum = buffer.getNewestSeqNum();
+               long oldestseqnum = buffer.getOldestSeqNum();
+               if (lastliveslotseqn < oldestseqnum)
+                       lastliveslotseqn = oldestseqnum;
+
+               long seqn = lastliveslotseqn;
+               boolean seenliveslot = false;
+               long firstiffull = newestseqnum + 1 - numslots;
+               long threshold = firstiffull + FREE_SLOTS;
                
-    if ((numslots - buffer.size()) < FREE_SLOTS) {
-      //have to check whether we have enough free slots
-      seqn = buffer.getNewestSeqNum() + 1 - numslots;
-      for(int i=0; i < FREE_SLOTS; i++, seqn++) {
-        Slot prevslot=buffer.getSlot(seqn);
-        if (!prevslot.isLive())
-          continue;
-        Vector<Entry> liveentries = prevslot.getLiveEntries();
-        for(Entry liveentry:liveentries) {
-          if (s.hasSpace(liveentry))
-            s.addEntry(liveentry);
-          else if (i==0) {
-            if (s.canFit(liveentry))
-              s.addEntry(liveentry);
-            else if (!forcedresize) {
-              return tryput(key, value, true);
-                                               }
+               for(; seqn < threshold; seqn++) {
+                       Slot prevslot=buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for(Entry liveentry:liveentries) {
+                               if (s.hasSpace(liveentry)) {
+                                       s.addEntry(liveentry);
+                               } else if (seqn==firstiffull) {
+                                       if (!resize) {
+                                               System.out.print("B");
+                                               return tryput(key, value, true);
                                        }
-        }
-      }
-    }
-    KeyValue kv=new KeyValue(s, key, value);
-    boolean insertedkv=false;
-    if (s.hasSpace(kv)) {
-      s.addEntry(kv);
-      insertedkv=true;
-    }
-
-    long newestseqnum=buffer.getNewestSeqNum();
-    search:
-    for(;seqn<=newestseqnum;seqn++) {
-      Slot prevslot=buffer.getSlot(seqn);
-      if (!prevslot.isLive())
-        continue;
-      Vector<Entry> liveentries = prevslot.getLiveEntries();
-      for(Entry liveentry:liveentries) {
-        if (s.hasSpace(liveentry))
-          s.addEntry(liveentry);
-        else
-          break search;
-      }
-    }
-    
-    int max=0;
-    if (forcedresize)
-      max = numslots + FORCED_RESIZE_INCREMENT;
-    Slot[] array=cloud.putSlot(s, max);
-    if (array == null)
-      array = new Slot[] {s};
-    else
-      insertedkv=false;
-    
-               validateandupdate(array); // update data structure
-    
-    return insertedkv;
+                               }
+                       }
+               }
+
+               KeyValue kv=new KeyValue(s, key, value);
+               boolean insertedkv=false;
+               if (s.hasSpace(kv)) {
+                       s.addEntry(kv);
+                       insertedkv=true;
+               }
+
+               int skipcount=0;
+               search:
+               for(; seqn <= newestseqnum; seqn++) {
+                       Slot prevslot=buffer.getSlot(seqn);
+                       //Push slot number forward
+                       if (!seenliveslot)
+                               lastliveslotseqn = seqn;
+                       
+                       if (!prevslot.isLive())
+                               continue;
+                       seenliveslot = true;
+                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+                       for(Entry liveentry:liveentries) {
+                               if (s.hasSpace(liveentry))
+                                       s.addEntry(liveentry);
+                               else {
+                                       skipcount++;
+                                       if (skipcount > SKIP_THRESHOLD)
+                                               break search;
+                               }
+                       }
+               }
+
+               int max=0;
+               if (resize)
+                       max = newsize;
+               Slot[] array=cloud.putSlot(s, max);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       rejectedmessagelist.clear();
+               }       else {
+                       if (array.length == 0)
+                               throw new Error("Server Error: Did not send any slots");
+                       rejectedmessagelist.add(s.getSequenceNumber());
+                       insertedkv=false;
+               }
+               
+               /* update data structure */
+               validateandupdate(array, true);
+
+               return insertedkv;
        }
 
-       private void validateandupdate(Slot[] newslots) {
-               //The cloud communication layer has checked slot HMACs already
-               //before decoding
+       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+               /The cloud communication layer has checked slot HMACs already
+                        before decoding */
                if (newslots.length==0)
                        return;
 
                long firstseqnum=newslots[0].getSequenceNumber();
-               if (firstseqnum < sequencenumber)
+               if (firstseqnum <= sequencenumber)
                        throw new Error("Server Error: Sent older slots!");
 
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
-    initExpectedSize();
-    for(Slot slot: newslots) {
-      updateExpectedSize();
-                       processSlot(indexer, slot);
+               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+
+               initExpectedSize(firstseqnum);
+               for(Slot slot: newslots) {
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+                       updateExpectedSize();
+               }
+
+               /* If there is a gap, check to see if the server sent us everything. */
+               if (firstseqnum != (sequencenumber+1)) {
+                       checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty())
+                               throw new Error("Missing record for machines: "+machineSet);
                }
-    checkNumSlots(newslots.length);
-    commitNewMaxSize();
 
-    //commit new to slots
-    for(Slot slot:newslots) {
-      buffer.putSlot(slot);
-    }
+               commitNewMaxSize();
+
+               /* Commit new to slots. */
+               for(Slot slot:newslots) {
+                       buffer.putSlot(slot);
+                       liveslotcount++;
+               }
+               sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+       }
+
+       private int expectedsize, currmaxsize;
+
+       private void checkNumSlots(int numslots) {
+               if (numslots != expectedsize)
+                       throw new Error("Server Error: Server did not send all slots.  Expected: "+expectedsize+" Received:"+numslots);
+       }
+
+       private void initExpectedSize(long firstsequencenumber) {
+               long prevslots = firstsequencenumber;
+               expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+               currmaxsize = numslots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currmaxsize)
+                       expectedsize = currmaxsize;
+       }
+
+       private void updateCurrMaxSize(int newmaxsize) {
+               currmaxsize=newmaxsize;
+       }
+
+       private void commitNewMaxSize() {
+               if (numslots != currmaxsize)
+                       buffer.resize(currmaxsize);
+
+               numslots=currmaxsize;
+               setResizeThreshold();
        }
 
-  private int expectedsize, currmaxsize;
-
-  private void checkNumSlots(int numslots) {
-    if (numslots != expectedsize)
-      throw new Error("Server Error: Server did not send all slots");
-  }
-  
-  private void initExpectedSize() {
-    expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
-    currmaxsize = numslots;
-  }
-
-  private void updateExpectedSize() {
-    expectedsize++;
-    if (expectedsize > currmaxsize)
-      expectedsize = currmaxsize;
-  }
-
-  private void updateCurrMaxSize(int newmaxsize) {
-    currmaxsize=newmaxsize;
-  }
-
-  private void commitNewMaxSize() {
-    if (numslots != currmaxsize)
-      buffer.resize(currmaxsize);
-
-    numslots=currmaxsize;
-  }
-  
        private void processEntry(KeyValue entry, SlotIndexer indexer) {
                IoTString key=entry.getKey();
                KeyValue oldvalue=table.get(key);
@@ -225,8 +320,8 @@ final public class Table {
                table.put(key, entry);
        }
 
-       private void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
+       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
        private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@ -243,33 +338,88 @@ final public class Table {
                                }
                        }
                }
+
+               HashSet<Long> watchset=new HashSet<Long>();
+               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+                       long entry_mid=lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v=lastmsg_entry.getValue();
+                       long entry_seqn=v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries=watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+               entries.add(entry);
        }
 
        private void processEntry(TableStatus entry, SlotIndexer indexer) {
-    int newnumslots=entry.getMaxSlots();
-    updateCurrMaxSize(newnumslots);
-    if (lastTableStatus != null)
-      lastTableStatus.setDead();
-    lastTableStatus = entry;
+               int newnumslots=entry.getMaxSlots();
+               updateCurrMaxSize(newnumslots);
+               if (lastTableStatus != null)
+                       lastTableStatus.setDead();
+               lastTableStatus = entry;
        }
 
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+               if (watchset != null) {
+                       for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm=rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+               
+               if (machineid == localmachineid) {
+                       /* Our own messages are immediately dead. */
+                       if (liveness instanceof LastMessage) {
+                               ((LastMessage)liveness).setDead();
+                       } else if (liveness instanceof Slot) {
+                               ((Slot)liveness).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+               
+               
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
 
                long lastmsgseqnum = lastmsgentry.getFirst();
                Liveness lastentry = lastmsgentry.getSecond();
-               if (lastentry instanceof LastMessage) {
-                       ((LastMessage)lastentry).setDead();
-               } else if (lastentry instanceof Slot) {
-                       ((Slot)lastentry).setDead();
-               } else {
-                       throw new Error("Unrecognized type");
+               if (machineid != localmachineid) {
+                       if (lastentry instanceof LastMessage) {
+                               ((LastMessage)lastentry).setDead();
+                       } else if (lastentry instanceof Slot) {
+                               ((Slot)lastentry).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
                }
-
+               
                if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum)
+                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
@@ -277,9 +427,8 @@ final public class Table {
                }
        }
 
-       private void processSlot(SlotIndexer indexer, Slot slot) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
-    
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
@@ -287,7 +436,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer);
+                               processEntry((LastMessage)entry, indexer, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage:
@@ -310,7 +459,7 @@ final public class Table {
                        Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
                        if (prevslot != null &&
                                        !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
-                               throw new Error("Server Error: Invalid HMAC Chain");
+                               throw new Error("Server Error: Invalid HMAC Chain"+currslot+" "+prevslot);
                }
        }
 }