edits
authorbdemsky <bdemsky@uci.edu>
Mon, 25 Jul 2016 01:37:31 +0000 (18:37 -0700)
committerbdemsky <bdemsky@uci.edu>
Mon, 25 Jul 2016 01:38:05 +0000 (18:38 -0700)
src/java/iotcloud/RejectedMessage.java
src/java/iotcloud/Table.java
src/java/iotcloud/issues.txt

index 0f0bfd008ccbcd03e8196b8549d0ab0e8fb0eb8b..0e8ec4f4bc4a55bc24452dc22a3970499646958c 100644 (file)
@@ -1,5 +1,6 @@
 package iotcloud;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
 
 /**
  * Entry for tracking messages that the server rejected.  We have to
@@ -20,6 +21,8 @@ class RejectedMessage extends Entry {
        /* Is the machine identifier of the relevant slots equal to (or not
         * equal to) the specified machine identifier. */
        private boolean equalto;
+       /* Set of machines that have not received notification. */
+       private HashSet<Long> watchset;
 
        RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
                super(slot);
@@ -53,6 +56,16 @@ class RejectedMessage extends Entry {
                return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
        }
 
+       void setWatchSet(HashSet<Long> _watchset) {
+               watchset=_watchset;
+       }
+
+       void removeWatcher(long machineid) {
+               if (watchset.remove(machineid))
+                       if (watchset.isEmpty())
+                               setDead();
+       }
+
        void encode(ByteBuffer bb) {
                bb.put(Entry.TypeRejectedMessage);
                bb.putLong(machineid);
index 05a85f8267e4592a321c4d3cb0c77f1c52c3c8c2..bd9e15b96a1aa2974f277ac568bb4ddde8275473 100644 (file)
@@ -1,5 +1,7 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
 import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
@@ -15,6 +17,7 @@ final public class Table {
        private int numslots;
        private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+       private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
        private SlotBuffer buffer;
        private CloudComm cloud;
        private long sequencenumber;
@@ -179,7 +182,7 @@ search:
                checkHMACChain(indexer, newslots);
 
                HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
-               
+
                initExpectedSize();
                for(Slot slot: newslots) {
                        updateExpectedSize();
@@ -259,6 +262,32 @@ search:
                                }
                        }
                }
+
+               HashSet<Long> watchset=new HashSet<Long>();
+               for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+                       long entry_mid=lastmsg_entry.getKey();
+                       /* We've seen it, don't need to continue to watch.  Our next
+                        * message will implicitly acknowledge it. */
+                       if (entry_mid == localmachineid)
+                               continue;
+                       Pair<Long, Liveness> v=lastmsg_entry.getValue();
+                       long entry_seqn=v.getFirst();
+                       if (entry_seqn < newseqnum) {
+                               addWatchList(entry_mid, entry);
+                               watchset.add(entry_mid);
+                       }
+               }
+               if (watchset.isEmpty())
+                       entry.setDead();
+               else
+                       entry.setWatchSet(watchset);
+       }
+
+       private void addWatchList(long machineid, RejectedMessage entry) {
+               HashSet<RejectedMessage> entries=watchlist.get(machineid);
+               if (entries == null)
+                       watchlist.put(machineid, entries=new HashSet<RejectedMessage>());
+               entries.add(entry);
        }
 
        private void processEntry(TableStatus entry, SlotIndexer indexer) {
@@ -271,6 +300,20 @@ search:
 
        private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
                machineSet.remove(machineid);
+
+               HashSet<RejectedMessage> watchset=watchlist.get(machineid);
+               if (watchset != null) {
+                       for(Iterator<RejectedMessage> rmit=watchset.iterator(); rmit.hasNext(); ) {
+                               RejectedMessage rm=rmit.next();
+                               if (rm.getNewSeqNum() <= seqnum) {
+                                       /* Remove it from our watchlist */
+                                       rmit.remove();
+                                       /* Decrement machines that need to see this notification */
+                                       rm.removeWatcher(machineid);
+                               }
+                       }
+               }
+
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
index dfd71014cc0972c67ba524e7d1665247f95b0306..07e678829ea649acf75407b0f9ccd81d8ce552fb 100644 (file)
@@ -1,3 +1,2 @@
-1) check expiration of rejectedmessage entries
-2) add crypto
-3) handle Salt
+1) add crypto
+2) handle Salt