add check for missing messages
authorbdemsky <bdemsky@uci.edu>
Mon, 25 Jul 2016 01:11:20 +0000 (18:11 -0700)
committerbdemsky <bdemsky@uci.edu>
Mon, 25 Jul 2016 01:11:20 +0000 (18:11 -0700)
src/java/iotcloud/Table.java
src/java/iotcloud/issues.txt

index 28434c5..05a85f8 100644 (file)
@@ -1,5 +1,6 @@
 package iotcloud;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Vector;
 
@@ -177,15 +178,20 @@ search:
                SlotIndexer indexer = new SlotIndexer(newslots, buffer);
                checkHMACChain(indexer, newslots);
 
+               HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+               
                initExpectedSize();
                for(Slot slot: newslots) {
                        updateExpectedSize();
-                       processSlot(indexer, slot, acceptupdatestolocal);
+                       processSlot(indexer, slot, acceptupdatestolocal, machineSet);
                }
 
                /* If there is a gap, check to see if the server sent us everything. */
-               if (firstseqnum != (sequencenumber+1))
+               if (firstseqnum != (sequencenumber+1)) {
                        checkNumSlots(newslots.length);
+                       if (!machineSet.isEmpty())
+                               throw new Error("Missing record for machines: "+machineSet);
+               }
 
                commitNewMaxSize();
 
@@ -235,8 +241,8 @@ search:
                table.put(key, entry);
        }
 
-       private void processEntry(LastMessage entry, SlotIndexer indexer) {
-               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
+       private void processEntry(LastMessage entry, SlotIndexer indexer, HashSet<Long> machineSet) {
+               updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
 
        private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
@@ -263,7 +269,8 @@ search:
                lastTableStatus = entry;
        }
 
-       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal) {
+       private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               machineSet.remove(machineid);
                Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
                if (lastmsgentry == null)
                        return;
@@ -287,8 +294,8 @@ search:
                }
        }
 
-       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal) {
-               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal);
+       private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+               updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
 
                for(Entry entry : slot.getEntries()) {
                        switch(entry.getType()) {
@@ -297,7 +304,7 @@ search:
                                break;
 
                        case Entry.TypeLastMessage:
-                               processEntry((LastMessage)entry, indexer);
+                               processEntry((LastMessage)entry, indexer, machineSet);
                                break;
 
                        case Entry.TypeRejectedMessage:
index da58ceb..dfd7101 100644 (file)
@@ -1,4 +1,3 @@
 1) check expiration of rejectedmessage entries
-2) check missing machine messages
-3) add crypto
-4) handle Salt
+2) add crypto
+3) handle Salt