Fixed Rejected Messages, Calculating correct size
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index e0d26cdc98d1783fa2b126466d1f82838ea2c469..f29b540941b3452e9d9668e789b4b1e35e4eb0e8 100644 (file)
@@ -20,8 +20,7 @@ import java.nio.ByteBuffer;
  */
 
 final public class Table {
-
-
+       
        /* Constants */
        static final int FREE_SLOTS = 10; // Number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
@@ -45,14 +44,17 @@ final public class Table {
        private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
        private long localMachineId = 0; // Machine ID of this client device
        private long sequenceNumber = 0; // Largest sequence number a client has received
-       private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
-       private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+       // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+       // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
        private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
        private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
        private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
        private long localArbitrationSequenceNumber = 0;
        private boolean hadPartialSendToServer = false;
        private boolean attemptedToSendToServer = false;
+       private long expectedsize;
+       private boolean didFindTableStatus = false;
+       private long currMaxSize = 0;
 
        /* Data Structures  */
        private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
@@ -1029,7 +1031,7 @@ final public class Table {
                        long old_seqn = rejectedSlotList.firstElement();
                        if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
                                long new_seqn = rejectedSlotList.lastElement();
-                               RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+                               RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
                                s.addEntry(rm);
                        } else {
                                long prev_seqn = -1;
@@ -1044,7 +1046,7 @@ final public class Table {
                                }
                                /* Generate rejected message entry for missing messages */
                                if (prev_seqn != -1) {
-                                       RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
                                        s.addEntry(rm);
                                }
                                /* Generate rejected message entries for present messages */
@@ -1052,7 +1054,7 @@ final public class Table {
                                        long curr_seqn = rejectedSlotList.get(i);
                                        Slot s_msg = buffer.getSlot(curr_seqn);
                                        long machineid = s_msg.getMachineID();
-                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
                                        s.addEntry(rm);
                                }
                        }
@@ -1148,11 +1150,6 @@ final public class Table {
                        return;
                }
 
-               // Reset the table status declared sizes
-               smallestTableStatusSeen = -1;
-               largestTableStatusSeen = -1;
-
-
                // Make sure all slots are newer than the last largest slot this client has seen
                long firstSeqNum = newSlots[0].getSequenceNumber();
                if (firstSeqNum <= sequenceNumber) {
@@ -1172,6 +1169,7 @@ final public class Table {
                // Process each slots data
                for (Slot slot : newSlots) {
                        processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+                       updateExpectedSize();
                }
 
                // If there is a gap, check to see if the server sent us everything.
@@ -1243,6 +1241,26 @@ final public class Table {
                updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
+
+       
+       private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+               if (didFindTableStatus) {
+                       return;
+               }
+               long prevslots = firstSequenceNumber;
+               expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+               currMaxSize = numberOfSlots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+               if (expectedsize > currMaxSize)
+               {
+                       expectedsize = currMaxSize;
+               }
+       }
+
+
        /**
         * Check the size of the block chain to make sure there are enough slots sent back by the server.
         * This is only called when we have a gap between the slots that we have locally and the slots
@@ -1250,41 +1268,29 @@ final public class Table {
         * status message
         */
        private void checkNumSlots(int numberOfSlots) {
-
-               // We only have 1 size so we must have this many slots
-               if (largestTableStatusSeen == smallestTableStatusSeen) {
-                       if (numberOfSlots != smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
-               } else {
-                       // We have more than 1
-                       if (numberOfSlots < smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
+               if (numberOfSlots != expectedsize) {
+                       throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
                }
        }
 
+       private void updateCurrMaxSize(int newmaxsize) {
+               currMaxSize=newmaxsize;
+       }
+
+
        /**
         * Update the size of of the local buffer if it is needed.
         */
        private void commitNewMaxSize() {
-
-               int currMaxSize = 0;
-
-               if (largestTableStatusSeen == -1) {
-                       // No table status seen so the current max size does not change
-                       currMaxSize = numberOfSlots;
-               } else {
-                       currMaxSize = largestTableStatusSeen;
-               }
+               didFindTableStatus = false;             
 
                // Resize the local slot buffer
                if (numberOfSlots != currMaxSize) {
-                       buffer.resize(currMaxSize);
+                       buffer.resize((int)currMaxSize);
                }
 
                // Change the number of local slots to the new size
-               numberOfSlots = currMaxSize;
+               numberOfSlots = (int)currMaxSize;
 
                // Recalculate the resize threshold since the size of the local buffer has changed
                setResizeThreshold();
@@ -2014,7 +2020,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry);
+                               processEntry((TableStatus)entry, slot.getSequenceNumber());
                                break;
 
                        default:
@@ -2052,8 +2058,11 @@ final public class Table {
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
-       private void processEntry(TableStatus entry) {
+       private void processEntry(TableStatus entry, long seq) {
                int newNumSlots = entry.getMaxSlots();
+               updateCurrMaxSize(newNumSlots);
+
+               initExpectedSize(seq, newNumSlots);
 
                if (liveTableStatus != null) {
                        // We have a larger table status so the old table status is no longer alive
@@ -2062,14 +2071,6 @@ final public class Table {
 
                // Make this new table status the latest alive table status
                liveTableStatus = entry;
-
-               if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
-                       smallestTableStatusSeen = newNumSlots;
-               }
-
-               if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
-                       largestTableStatusSeen = newNumSlots;
-               }
        }
 
        /**
@@ -2080,6 +2081,7 @@ final public class Table {
                long newSeqNum = entry.getNewSeqNum();
                boolean isequal = entry.getEqual();
                long machineId = entry.getMachineID();
+               long seq = entry.getSequenceNumber();
 
 
                // Check if we have messages that were supposed to be rejected in our local block chain
@@ -2115,7 +2117,7 @@ final public class Table {
                        Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
                        long entrySequenceNumber = lastMessageValue.getFirst();
 
-                       if (entrySequenceNumber < newSeqNum) {
+                       if (entrySequenceNumber < seq) {
 
                                // Add this rejected message to the set of messages that this machine ID did not see yet
                                addWatchList(lastMessageEntryMachineId, entry);
@@ -2276,7 +2278,7 @@ final public class Table {
                                RejectedMessage rm = rmit.next();
 
                                // If this machine Id has seen this rejected message...
-                               if (rm.getNewSeqNum() <= seqNum) {
+                               if (rm.getSequenceNumber() <= seqNum) {
 
                                        // Remove it from our watchlist
                                        rmit.remove();