From 926a6eec669fc73d92e49a9cf7b993dced9b50ec Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sun, 24 Jul 2016 18:37:31 -0700 Subject: [PATCH] edits --- src/java/iotcloud/RejectedMessage.java | 13 ++++++++ src/java/iotcloud/Table.java | 45 +++++++++++++++++++++++++- src/java/iotcloud/issues.txt | 5 ++- 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/java/iotcloud/RejectedMessage.java b/src/java/iotcloud/RejectedMessage.java index 0f0bfd0..0e8ec4f 100644 --- a/src/java/iotcloud/RejectedMessage.java +++ b/src/java/iotcloud/RejectedMessage.java @@ -1,5 +1,6 @@ package iotcloud; import java.nio.ByteBuffer; +import java.util.HashSet; /** * Entry for tracking messages that the server rejected. We have to @@ -20,6 +21,8 @@ class RejectedMessage extends Entry { /* Is the machine identifier of the relevant slots equal to (or not * equal to) the specified machine identifier. */ private boolean equalto; + /* Set of machines that have not received notification. */ + private HashSet watchset; RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) { super(slot); @@ -53,6 +56,16 @@ class RejectedMessage extends Entry { return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1); } + void setWatchSet(HashSet _watchset) { + watchset=_watchset; + } + + void removeWatcher(long machineid) { + if (watchset.remove(machineid)) + if (watchset.isEmpty()) + setDead(); + } + void encode(ByteBuffer bb) { bb.put(Entry.TypeRejectedMessage); bb.putLong(machineid); diff --git a/src/java/iotcloud/Table.java b/src/java/iotcloud/Table.java index 05a85f8..bd9e15b 100644 --- a/src/java/iotcloud/Table.java +++ b/src/java/iotcloud/Table.java @@ -1,5 +1,7 @@ package iotcloud; import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; import java.util.HashSet; import java.util.Arrays; import java.util.Vector; @@ -15,6 +17,7 @@ final public class Table { private int numslots; private HashMap table=new HashMap(); private HashMap > lastmessagetable=new HashMap >(); + private HashMap > watchlist = new HashMap >(); private SlotBuffer buffer; private CloudComm cloud; private long sequencenumber; @@ -179,7 +182,7 @@ search: checkHMACChain(indexer, newslots); HashSet machineSet=new HashSet(lastmessagetable.keySet()); - + initExpectedSize(); for(Slot slot: newslots) { updateExpectedSize(); @@ -259,6 +262,32 @@ search: } } } + + HashSet watchset=new HashSet(); + for(Map.Entry > lastmsg_entry:lastmessagetable.entrySet()) { + long entry_mid=lastmsg_entry.getKey(); + /* We've seen it, don't need to continue to watch. Our next + * message will implicitly acknowledge it. */ + if (entry_mid == localmachineid) + continue; + Pair v=lastmsg_entry.getValue(); + long entry_seqn=v.getFirst(); + if (entry_seqn < newseqnum) { + addWatchList(entry_mid, entry); + watchset.add(entry_mid); + } + } + if (watchset.isEmpty()) + entry.setDead(); + else + entry.setWatchSet(watchset); + } + + private void addWatchList(long machineid, RejectedMessage entry) { + HashSet entries=watchlist.get(machineid); + if (entries == null) + watchlist.put(machineid, entries=new HashSet()); + entries.add(entry); } private void processEntry(TableStatus entry, SlotIndexer indexer) { @@ -271,6 +300,20 @@ search: private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet machineSet) { machineSet.remove(machineid); + + HashSet watchset=watchlist.get(machineid); + if (watchset != null) { + for(Iterator rmit=watchset.iterator(); rmit.hasNext(); ) { + RejectedMessage rm=rmit.next(); + if (rm.getNewSeqNum() <= seqnum) { + /* Remove it from our watchlist */ + rmit.remove(); + /* Decrement machines that need to see this notification */ + rm.removeWatcher(machineid); + } + } + } + Pair lastmsgentry = lastmessagetable.put(machineid, new Pair(seqnum, liveness)); if (lastmsgentry == null) return; diff --git a/src/java/iotcloud/issues.txt b/src/java/iotcloud/issues.txt index dfd7101..07e6788 100644 --- a/src/java/iotcloud/issues.txt +++ b/src/java/iotcloud/issues.txt @@ -1,3 +1,2 @@ -1) check expiration of rejectedmessage entries -2) add crypto -3) handle Salt +1) add crypto +2) handle Salt -- 2.34.1