edits
[iotcloud.git] / src / java / iotcloud / Table.java
index 05a85f8267e4592a321c4d3cb0c77f1c52c3c8c2..bd9e15b96a1aa2974f277ac568bb4ddde8275473 100644 (file)
@@ -1,5 +1,7 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
 import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
@@ -15,6 +17,7 @@ final public class Table {
        private int numslots;
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
        private SlotBuffer buffer;
        private CloudComm cloud;
        private long sequencenumber;
@@ -179,7 +182,7 @@ search:
                checkHMACChain(indexer, newslots);
 
                HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
-               
+
                initExpectedSize();
                for(Slot slot: newslots) {
                        updateExpectedSize();
@@ -259,6 +262,32 @@ search:
                                }
                        }
                }
+
+               HashSet<Long> watchset=new HashSet<Long>();
+               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+                       long entry_mid=lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v=lastmsg_entry.getValue();
+                       long entry_seqn=v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries=watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+               entries.add(entry);
        }
 
        private void processEntry(TableStatus entry, SlotIndexer indexer) {
@@ -271,6 +300,20 @@ search:
 
        private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
                machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+               if (watchset != null) {
+                       for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm=rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;