Fixed Rejected Messages, Calculating correct size
authorAli Younis <ayounis@uci.edu>
Fri, 3 Feb 2017 00:36:48 +0000 (16:36 -0800)
committerAli Younis <ayounis@uci.edu>
Fri, 3 Feb 2017 00:36:48 +0000 (16:36 -0800)
version2/src/java/iotcloud/RejectedMessage.java
version2/src/java/iotcloud/Table.java
version2/src/java/iotcloud/Test.java

index 9c84f18e053720b43de5e617684ba512d828b2fd..741f92f6f04714e51d6d2ebdd73c31fdd62aefa4 100644 (file)
@@ -12,6 +12,10 @@ import java.util.HashSet;
 
 
 class RejectedMessage extends Entry {
 
 
 class RejectedMessage extends Entry {
+       /* Sequence number */
+       private long sequencenum;
+       
+
        /* Machine identifier */
        private long machineid;
        /* Oldest sequence number in range */
        /* Machine identifier */
        private long machineid;
        /* Oldest sequence number in range */
@@ -24,8 +28,9 @@ class RejectedMessage extends Entry {
        /* Set of machines that have not received notification. */
        private HashSet<Long> watchset;
 
        /* Set of machines that have not received notification. */
        private HashSet<Long> watchset;
 
-       RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
+       RejectedMessage(Slot slot, long _sequencenum, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) {
                super(slot);
                super(slot);
+               sequencenum = _sequencenum;
                machineid=_machineid;
                oldseqnum=_oldseqnum;
                newseqnum=_newseqnum;
                machineid=_machineid;
                oldseqnum=_oldseqnum;
                newseqnum=_newseqnum;
@@ -48,12 +53,18 @@ class RejectedMessage extends Entry {
                return machineid;
        }
 
                return machineid;
        }
 
+
+       long getSequenceNumber() {
+               return sequencenum;
+       }
+
        static Entry decode(Slot slot, ByteBuffer bb) {
        static Entry decode(Slot slot, ByteBuffer bb) {
+               long sequencenum=bb.getLong();
                long machineid=bb.getLong();
                long oldseqnum=bb.getLong();
                long newseqnum=bb.getLong();
                byte equalto=bb.get();
                long machineid=bb.getLong();
                long oldseqnum=bb.getLong();
                long newseqnum=bb.getLong();
                byte equalto=bb.get();
-               return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1);
+               return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto==1);
        }
 
        void setWatchSet(HashSet<Long> _watchset) {
        }
 
        void setWatchSet(HashSet<Long> _watchset) {
@@ -68,6 +79,7 @@ class RejectedMessage extends Entry {
 
        void encode(ByteBuffer bb) {
                bb.put(Entry.TypeRejectedMessage);
 
        void encode(ByteBuffer bb) {
                bb.put(Entry.TypeRejectedMessage);
+               bb.putLong(sequencenum);
                bb.putLong(machineid);
                bb.putLong(oldseqnum);
                bb.putLong(newseqnum);
                bb.putLong(machineid);
                bb.putLong(oldseqnum);
                bb.putLong(newseqnum);
@@ -75,7 +87,7 @@ class RejectedMessage extends Entry {
        }
 
        int getSize() {
        }
 
        int getSize() {
-               return 3*Long.BYTES + 2*Byte.BYTES;
+               return 4*Long.BYTES + 2*Byte.BYTES;
        }
 
        byte getType() {
        }
 
        byte getType() {
@@ -83,6 +95,6 @@ class RejectedMessage extends Entry {
        }
        
        Entry getCopy(Slot s) {
        }
        
        Entry getCopy(Slot s) {
-               return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto);
+               return new RejectedMessage(s,sequencenum, machineid, oldseqnum, newseqnum, equalto);
        }
 }
        }
 }
index e0d26cdc98d1783fa2b126466d1f82838ea2c469..f29b540941b3452e9d9668e789b4b1e35e4eb0e8 100644 (file)
@@ -20,8 +20,7 @@ import java.nio.ByteBuffer;
  */
 
 final public class Table {
  */
 
 final public class Table {
-
-
+       
        /* Constants */
        static final int FREE_SLOTS = 10; // Number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
        /* Constants */
        static final int FREE_SLOTS = 10; // Number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
@@ -45,14 +44,17 @@ final public class Table {
        private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
        private long localMachineId = 0; // Machine ID of this client device
        private long sequenceNumber = 0; // Largest sequence number a client has received
        private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
        private long localMachineId = 0; // Machine ID of this client device
        private long sequenceNumber = 0; // Largest sequence number a client has received
-       private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
-       private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+       // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+       // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
        private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
        private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
        private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
        private long localArbitrationSequenceNumber = 0;
        private boolean hadPartialSendToServer = false;
        private boolean attemptedToSendToServer = false;
        private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
        private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
        private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
        private long localArbitrationSequenceNumber = 0;
        private boolean hadPartialSendToServer = false;
        private boolean attemptedToSendToServer = false;
+       private long expectedsize;
+       private boolean didFindTableStatus = false;
+       private long currMaxSize = 0;
 
        /* Data Structures  */
        private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
 
        /* Data Structures  */
        private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
@@ -1029,7 +1031,7 @@ final public class Table {
                        long old_seqn = rejectedSlotList.firstElement();
                        if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
                                long new_seqn = rejectedSlotList.lastElement();
                        long old_seqn = rejectedSlotList.firstElement();
                        if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
                                long new_seqn = rejectedSlotList.lastElement();
-                               RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+                               RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
                                s.addEntry(rm);
                        } else {
                                long prev_seqn = -1;
                                s.addEntry(rm);
                        } else {
                                long prev_seqn = -1;
@@ -1044,7 +1046,7 @@ final public class Table {
                                }
                                /* Generate rejected message entry for missing messages */
                                if (prev_seqn != -1) {
                                }
                                /* Generate rejected message entry for missing messages */
                                if (prev_seqn != -1) {
-                                       RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
                                        s.addEntry(rm);
                                }
                                /* Generate rejected message entries for present messages */
                                        s.addEntry(rm);
                                }
                                /* Generate rejected message entries for present messages */
@@ -1052,7 +1054,7 @@ final public class Table {
                                        long curr_seqn = rejectedSlotList.get(i);
                                        Slot s_msg = buffer.getSlot(curr_seqn);
                                        long machineid = s_msg.getMachineID();
                                        long curr_seqn = rejectedSlotList.get(i);
                                        Slot s_msg = buffer.getSlot(curr_seqn);
                                        long machineid = s_msg.getMachineID();
-                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
                                        s.addEntry(rm);
                                }
                        }
                                        s.addEntry(rm);
                                }
                        }
@@ -1148,11 +1150,6 @@ final public class Table {
                        return;
                }
 
                        return;
                }
 
-               // Reset the table status declared sizes
-               smallestTableStatusSeen = -1;
-               largestTableStatusSeen = -1;
-
-
                // Make sure all slots are newer than the last largest slot this client has seen
                long firstSeqNum = newSlots[0].getSequenceNumber();
                if (firstSeqNum <= sequenceNumber) {
                // Make sure all slots are newer than the last largest slot this client has seen
                long firstSeqNum = newSlots[0].getSequenceNumber();
                if (firstSeqNum <= sequenceNumber) {
@@ -1172,6 +1169,7 @@ final public class Table {
                // Process each slots data
                for (Slot slot : newSlots) {
                        processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
                // Process each slots data
                for (Slot slot : newSlots) {
                        processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+                       updateExpectedSize();
                }
 
                // If there is a gap, check to see if the server sent us everything.
                }
 
                // If there is a gap, check to see if the server sent us everything.
@@ -1243,6 +1241,26 @@ final public class Table {
                updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
                updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
+
+       
+       private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+               if (didFindTableStatus) {
+                       return;
+               }
+               long prevslots = firstSequenceNumber;
+               expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+               currMaxSize = numberOfSlots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currMaxSize)
+               {
+                       expectedsize = currMaxSize;
+               }
+       }
+
+
        /**
         * Check the size of the block chain to make sure there are enough slots sent back by the server.
         * This is only called when we have a gap between the slots that we have locally and the slots
        /**
         * Check the size of the block chain to make sure there are enough slots sent back by the server.
         * This is only called when we have a gap between the slots that we have locally and the slots
@@ -1250,41 +1268,29 @@ final public class Table {
         * status message
         */
        private void checkNumSlots(int numberOfSlots) {
         * status message
         */
        private void checkNumSlots(int numberOfSlots) {
-
-               // We only have 1 size so we must have this many slots
-               if (largestTableStatusSeen == smallestTableStatusSeen) {
-                       if (numberOfSlots != smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
-               } else {
-                       // We have more than 1
-                       if (numberOfSlots < smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
+               if (numberOfSlots != expectedsize) {
+                       throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
                }
        }
 
                }
        }
 
+       private void updateCurrMaxSize(int newmaxsize) {
+               currMaxSize=newmaxsize;
+       }
+
+
        /**
         * Update the size of of the local buffer if it is needed.
         */
        private void commitNewMaxSize() {
        /**
         * Update the size of of the local buffer if it is needed.
         */
        private void commitNewMaxSize() {
-
-               int currMaxSize = 0;
-
-               if (largestTableStatusSeen == -1) {
-                       // No table status seen so the current max size does not change
-                       currMaxSize = numberOfSlots;
-               } else {
-                       currMaxSize = largestTableStatusSeen;
-               }
+               didFindTableStatus = false;             
 
                // Resize the local slot buffer
                if (numberOfSlots != currMaxSize) {
 
                // Resize the local slot buffer
                if (numberOfSlots != currMaxSize) {
-                       buffer.resize(currMaxSize);
+                       buffer.resize((int)currMaxSize);
                }
 
                // Change the number of local slots to the new size
                }
 
                // Change the number of local slots to the new size
-               numberOfSlots = currMaxSize;
+               numberOfSlots = (int)currMaxSize;
 
                // Recalculate the resize threshold since the size of the local buffer has changed
                setResizeThreshold();
 
                // Recalculate the resize threshold since the size of the local buffer has changed
                setResizeThreshold();
@@ -2014,7 +2020,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry);
+                               processEntry((TableStatus)entry, slot.getSequenceNumber());
                                break;
 
                        default:
                                break;
 
                        default:
@@ -2052,8 +2058,11 @@ final public class Table {
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
-       private void processEntry(TableStatus entry) {
+       private void processEntry(TableStatus entry, long seq) {
                int newNumSlots = entry.getMaxSlots();
                int newNumSlots = entry.getMaxSlots();
+               updateCurrMaxSize(newNumSlots);
+
+               initExpectedSize(seq, newNumSlots);
 
                if (liveTableStatus != null) {
                        // We have a larger table status so the old table status is no longer alive
 
                if (liveTableStatus != null) {
                        // We have a larger table status so the old table status is no longer alive
@@ -2062,14 +2071,6 @@ final public class Table {
 
                // Make this new table status the latest alive table status
                liveTableStatus = entry;
 
                // Make this new table status the latest alive table status
                liveTableStatus = entry;
-
-               if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
-                       smallestTableStatusSeen = newNumSlots;
-               }
-
-               if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
-                       largestTableStatusSeen = newNumSlots;
-               }
        }
 
        /**
        }
 
        /**
@@ -2080,6 +2081,7 @@ final public class Table {
                long newSeqNum = entry.getNewSeqNum();
                boolean isequal = entry.getEqual();
                long machineId = entry.getMachineID();
                long newSeqNum = entry.getNewSeqNum();
                boolean isequal = entry.getEqual();
                long machineId = entry.getMachineID();
+               long seq = entry.getSequenceNumber();
 
 
                // Check if we have messages that were supposed to be rejected in our local block chain
 
 
                // Check if we have messages that were supposed to be rejected in our local block chain
@@ -2115,7 +2117,7 @@ final public class Table {
                        Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
                        long entrySequenceNumber = lastMessageValue.getFirst();
 
                        Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
                        long entrySequenceNumber = lastMessageValue.getFirst();
 
-                       if (entrySequenceNumber < newSeqNum) {
+                       if (entrySequenceNumber < seq) {
 
                                // Add this rejected message to the set of messages that this machine ID did not see yet
                                addWatchList(lastMessageEntryMachineId, entry);
 
                                // Add this rejected message to the set of messages that this machine ID did not see yet
                                addWatchList(lastMessageEntryMachineId, entry);
@@ -2276,7 +2278,7 @@ final public class Table {
                                RejectedMessage rm = rmit.next();
 
                                // If this machine Id has seen this rejected message...
                                RejectedMessage rm = rmit.next();
 
                                // If this machine Id has seen this rejected message...
-                               if (rm.getNewSeqNum() <= seqNum) {
+                               if (rm.getSequenceNumber() <= seqNum) {
 
                                        // Remove it from our watchlist
                                        rmit.remove();
 
                                        // Remove it from our watchlist
                                        rmit.remove();
index d6598628b86c7ce7551d97a6e7bc540d77b566a1..bfe0f871cf73ec17b4f943977e8a55b32e1afc65 100644 (file)
@@ -11,7 +11,7 @@ import java.util.ArrayList;
 
 public class Test {
 
 
 public class Test {
 
-    public static final  int NUMBER_OF_TESTS = 10;
+    public static final  int NUMBER_OF_TESTS = 1000;
 
     public static void main(String[] args)  throws ServerException {
         if (args[0].equals("2")) {
 
     public static void main(String[] args)  throws ServerException {
         if (args[0].equals("2")) {