Add support for salts and crypto
[iotcloud.git] / src / java / iotcloud / Table.java
index d4a62c6abda0576fcd9e0dd04180ce3cc43cb47b..028025f04ffcb8720f05e623ea55dc8d0efaa361 100644 (file)
@@ -1,12 +1,23 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
 
+/**
+ * IoTTable data structure.  Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+
 final public class Table {
        private int numslots;
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
        private SlotBuffer buffer;
        private CloudComm cloud;
        private long sequencenumber;
@@ -31,6 +42,11 @@ final public class Table {
                cloud=_cloud;
        }
 
+       public void rebuild() {
+               Slot[] newslots=cloud.getSlots(sequencenumber+1);
+               validateandupdate(newslots, true);
+       }
+
        public void update() {
                Slot[] newslots=cloud.getSlots(sequencenumber+1);
 
@@ -46,18 +62,24 @@ final public class Table {
        }
 
        public void initTable() {
+               cloud.setSalt();//Set the salt
                Slot s=new Slot(1, localmachineid);
                TableStatus status=new TableStatus(s, numslots);
                s.addEntry(status);
                Slot[] array=cloud.putSlot(s, numslots);
                if (array == null) {
                        array = new Slot[] {s};
-                       validateandupdate(array, true);                                                                         // update data structure
+                       /* update data structure */
+                       validateandupdate(array, true);
                } else {
                        throw new Error("Error on initialization");
                }
        }
 
+       public String toString() {
+               return table.toString();
+       }
+       
        public IoTString put(IoTString key, IoTString value) {
                while(true) {
                        KeyValue oldvalue=table.get(key);
@@ -80,7 +102,7 @@ final public class Table {
                }
 
                if ((numslots - buffer.size()) < FREE_SLOTS) {
-                       //have to check whether we have enough free slots
+                       /* have to check whether we have enough free slots */
                        long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
                        seqn = fullfirstseqn < 1?1:fullfirstseqn;
                        for(int i=0; i < FREE_SLOTS; i++, seqn++) {
@@ -89,8 +111,6 @@ final public class Table {
                                        continue;
                                Vector<Entry> liveentries = prevslot.getLiveEntries();
                                for(Entry liveentry:liveentries) {
-                                       if (redundant(liveentry))
-                                               continue;
                                        if (s.hasSpace(liveentry))
                                                s.addEntry(liveentry);
                                        else if (i==0) {
@@ -118,8 +138,6 @@ search:
                                continue;
                        Vector<Entry> liveentries = prevslot.getLiveEntries();
                        for(Entry liveentry:liveentries) {
-                               if (redundant(liveentry))
-                                       continue;
                                if (s.hasSpace(liveentry))
                                        s.addEntry(liveentry);
                                else
@@ -136,23 +154,15 @@ search:
                else
                        insertedkv=false;
 
-               validateandupdate(array, true);                                                 // update data structure
+               /* update data structure */
+               validateandupdate(array, true);
 
                return insertedkv;
        }
 
-       boolean redundant(Entry liveentry) {
-               if (liveentry.getType()==Entry.TypeLastMessage) {
-                       LastMessage lastmsg=(LastMessage) liveentry;
-                       return lastmsg.getMachineID() == localmachineid;
-               }
-               return false;
-       }
-
-
-       private void validateandupdate(Slot[] newslots, boolean isput) {
-               //The cloud communication layer has checked slot HMACs already
-               //before decoding
+       private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+               /* The cloud communication layer has checked slot HMACs already
+                        before decoding */
                if (newslots.length==0)
                        return;
 
@@ -163,19 +173,24 @@ search:
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
+               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+
                initExpectedSize();
                for(Slot slot: newslots) {
                        updateExpectedSize();
-                       processSlot(indexer, slot, isput);
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
                }
 
-               //If there is a gap, check to see if the server sent us everything
-               if (firstseqnum != (sequencenumber+1))
+               /* If there is a gap, check to see if the server sent us everything. */
+               if (firstseqnum != (sequencenumber+1)) {
                        checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty())
+                               throw new Error("Missing record for machines: "+machineSet);
+               }
 
                commitNewMaxSize();
 
-               //commit new to slots
+               /* Commit new to slots. */
                for(Slot slot:newslots) {
                        buffer.putSlot(slot);
                }
@@ -221,8 +236,8 @@ search:
                table.put(key, entry);
        }
 
-       private void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
+       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
        private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@ -239,6 +254,32 @@ search:
                                }
                        }
                }
+
+               HashSet<Long> watchset=new HashSet<Long>();
+               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+                       long entry_mid=lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v=lastmsg_entry.getValue();
+                       long entry_seqn=v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries=watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+               entries.add(entry);
        }
 
        private void processEntry(TableStatus entry, SlotIndexer indexer) {
@@ -249,23 +290,52 @@ search:
                lastTableStatus = entry;
        }
 
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+               if (watchset != null) {
+                       for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm=rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+               
+               if (machineid == localmachineid) {
+                       /* Our own messages are immediately dead. */
+                       if (liveness instanceof LastMessage) {
+                               ((LastMessage)liveness).setDead();
+                       } else if (liveness instanceof Slot) {
+                               ((Slot)liveness).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
+               }
+               
+               
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
 
                long lastmsgseqnum = lastmsgentry.getFirst();
                Liveness lastentry = lastmsgentry.getSecond();
-               if (lastentry instanceof LastMessage) {
-                       ((LastMessage)lastentry).setDead();
-               } else if (lastentry instanceof Slot) {
-                       ((Slot)lastentry).setDead();
-               } else {
-                       throw new Error("Unrecognized type");
+               if (machineid != localmachineid) {
+                       if (lastentry instanceof LastMessage) {
+                               ((LastMessage)lastentry).setDead();
+                       } else if (lastentry instanceof Slot) {
+                               ((Slot)lastentry).setDead();
+                       } else {
+                               throw new Error("Unrecognized type");
+                       }
                }
-
+               
                if (machineid == localmachineid) {
-                       if (lastmsgseqnum != seqnum && !isput)
+                       if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
                                throw new Error("Server Error: Mismatch on local machine sequence number");
                } else {
                        if (lastmsgseqnum > seqnum)
@@ -273,9 +343,8 @@ search:
                }
        }
 
-       private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
-
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
                        case Entry.TypeKeyValue:
@@ -283,7 +352,7 @@ search:
                                break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer);
+                               processEntry((LastMessage)entry, indexer, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage: