edits
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index c254939a25d6db663ff7808231a6b1a490cf8a1e..25b677565f89d8db56c69e4ed30c4d52e1ba6e9c 100644 (file)
@@ -21,9 +21,8 @@ import java.nio.ByteBuffer;
 
 final public class Table {
 
-
        /* Constants */
-       static final int FREE_SLOTS = 10; // Number of slots that should be kept free
+       static final int FREE_SLOTS = 2; // Number of slots that should be kept free // 10
        static final int SKIP_THRESHOLD = 10;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
@@ -45,14 +44,27 @@ final public class Table {
        private long oldestLiveSlotSequenceNumver = 0;  // Smallest sequence number of the slot with a live entry
        private long localMachineId = 0; // Machine ID of this client device
        private long sequenceNumber = 0; // Largest sequence number a client has received
-       private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
-       private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
+       private long localSequenceNumber = 0;
+
+       // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
+       // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
        private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
        private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
        private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
        private long localArbitrationSequenceNumber = 0;
        private boolean hadPartialSendToServer = false;
        private boolean attemptedToSendToServer = false;
+       private long expectedsize;
+       private boolean didFindTableStatus = false;
+       private long currMaxSize = 0;
+
+       private Slot lastSlotAttemptedToSend = null;
+       private boolean lastIsNewKey = false;
+       private int lastNewSize = 0;
+       private Map<Transaction, List<Integer>> lastTransactionPartsSent = null;
+       private List<Entry> lastPendingSendArbitrationEntriesToDelete = null;
+       private NewKey lastNewKey = null;
+
 
        /* Data Structures  */
        private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
@@ -84,9 +96,6 @@ final public class Table {
        private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
 
 
-
-
-
        public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
                localMachineId = _localMachineId;
                cloud = new CloudComm(this, baseurl, password, listeningPort);
@@ -159,14 +168,34 @@ final public class Table {
 
                int livec = 0;
                int deadc = 0;
+
+               int casdasd = 0;
+
+               int liveslo = 0;
+
                for (long i = o; i < (n + 1); i++) {
                        Slot s = buffer.getSlot(i);
 
+
+                       if (s.isLive()) {
+                               liveslo++;
+                       }
+
                        Vector<Entry> entries = s.getEntries();
 
                        for (Entry e : entries) {
                                if (e.isLive()) {
                                        int type = e.getType();
+
+
+                                       if (type == 6) {
+                                               RejectedMessage rej = (RejectedMessage)e;
+                                               casdasd++;
+
+                                               System.out.println(rej.getMachineID());
+                                       }
+
+
                                        types[type] = types[type] + 1;
                                        num++;
                                        livec++;
@@ -180,47 +209,31 @@ final public class Table {
                        System.out.println(i + "    " + types[i]);
                }
                System.out.println("Live count:   " + livec);
+               System.out.println("Live Slot count:   " + liveslo);
+
                System.out.println("Dead count:   " + deadc);
                System.out.println("Old:   " + o);
                System.out.println("New:   " + n);
                System.out.println("Size:   " + buffer.size());
+               // System.out.println("Commits:   " + liveCommitsTable.size());
+               System.out.println("pendingTrans:   " + pendingTransactionQueue.size());
+               System.out.println("Trans Status Out:   " + outstandingTransactionStatus.size());
 
-               // List<IoTString> strList = new ArrayList<IoTString>();
-               // for (int i = 0; i < 100; i++) {
-               //      String keyA = "a" + i;
-               //      String keyB = "b" + i;
-               //      String keyC = "c" + i;
-               //      String keyD = "d" + i;
-
-               //      IoTString iKeyA = new IoTString(keyA);
-               //      IoTString iKeyB = new IoTString(keyB);
-               //      IoTString iKeyC = new IoTString(keyC);
-               //      IoTString iKeyD = new IoTString(keyD);
-
-               //      strList.add(iKeyA);
-               //      strList.add(iKeyB);
-               //      strList.add(iKeyC);
-               //      strList.add(iKeyD);
-               // }
-
+               for (Long k : lastArbitratedTransactionNumberByArbitratorTable.keySet()) {
+                       System.out.println(k + ": " + lastArbitratedTransactionNumberByArbitratorTable.get(k));
+               }
 
-               // for (Long l : commitMap.keySet()) {
-               //      for (Long l2 : commitMap.get(l).keySet()) {
-               //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
-               //                      strList.remove(kv.getKey());
-               //                      System.out.print(kv.getKey() + "    ");
-               //              }
-               //      }
-               // }
 
-               // System.out.println();
-               // System.out.println();
+               for (Long a : liveCommitsTable.keySet()) {
+                       for (Long b : liveCommitsTable.get(a).keySet()) {
+                               for (KeyValue kv : liveCommitsTable.get(a).get(b).getKeyValueUpdateSet()) {
+                                       System.out.print(kv + " ");
+                               }
+                               System.out.print("|| ");
+                       }
+                       System.out.println();
+               }
 
-               // for (IoTString s : strList) {
-               //      System.out.print(s + "    ");
-               // }
-               // System.out.println();
-               // System.out.println(strList.size());
        }
 
        /**
@@ -231,7 +244,8 @@ final public class Table {
                cloud.initSecurity();
 
                // Create the first insertion into the block chain which is the table status
-               Slot s = new Slot(this, 1, localMachineId);
+               Slot s = new Slot(this, 1, localMachineId, localSequenceNumber);
+               localSequenceNumber++;
                TableStatus status = new TableStatus(s, numberOfSlots);
                s.addEntry(status);
                Slot[] array = cloud.putSlot(s, numberOfSlots);
@@ -255,21 +269,24 @@ final public class Table {
                // Just pull the latest slots from the server
                Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
                validateAndUpdate(newslots, true);
+               sendToServer(null);
+               updateLiveTransactionsAndStatus();
+
        }
 
-// public String toString() {
-//     String retString = " Committed Table: \n";
-//     retString += "---------------------------\n";
-//     retString += commitedTable.toString();
+       // public String toString() {
+       //      String retString = " Committed Table: \n";
+       //      retString += "---------------------------\n";
+       //      retString += commitedTable.toString();
 
-//     retString += "\n\n";
+       //      retString += "\n\n";
 
-//     retString += " Speculative Table: \n";
-//     retString += "---------------------------\n";
-//     retString += speculativeTable.toString();
+       //      retString += " Speculative Table: \n";
+       //      retString += "---------------------------\n";
+       //      retString += speculativeTable.toString();
 
-//     return retString;
-// }
+       //      return retString;
+       // }
 
        public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
                localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
@@ -369,9 +386,16 @@ final public class Table {
                        validateAndUpdate(newSlots, false);
                        sendToServer(null);
 
+
+                       updateLiveTransactionsAndStatus();
+
                        return true;
                } catch (Exception e) {
                        // e.printStackTrace();
+
+                       for (Long m : localCommunicationTable.keySet()) {
+                               updateFromLocal(m);
+                       }
                }
 
                return false;
@@ -385,6 +409,7 @@ final public class Table {
                        }
 
                        NewKey newKey = new NewKey(null, keyName, machineId);
+
                        if (sendToServer(newKey)) {
                                // If successfully inserted
                                return true;
@@ -500,14 +525,224 @@ final public class Table {
                bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
        }
 
+       public long getLocalSequenceNumber() {
+               return localSequenceNumber;
+       }
+
+
+       boolean lastInsertedNewKey = false;
+
        private boolean sendToServer(NewKey newKey) throws ServerException {
 
+               boolean fromRetry = false;
+
+               try {
+                       if (hadPartialSendToServer) {
+                               Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+                               if (newSlots.length == 0) {
+                                       fromRetry = true;
+                                       ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+
+                                       if (sendSlotsReturn.getFirst()) {
+                                               if (newKey != null) {
+                                                       if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                               newKey = null;
+                                                       }
+                                               }
+
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+
+                                                       // Update which transactions parts still need to be sent
+                                                       transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                       // Add the transaction status to the outstanding list
+                                                       outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                       // Update the transaction status
+                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                       if (transaction.didSendAllParts()) {
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                               pendingTransactionQueue.remove(transaction);
+                                                       }
+                                               }
+                                       } else {
+
+                                               newSlots = sendSlotsReturn.getThird();
+
+                                               boolean isInserted = false;
+                                               for (Slot s : newSlots) {
+                                                       if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+                                                               isInserted = true;
+                                                               break;
+                                                       }
+                                               }
+
+                                               for (Slot s : newSlots) {
+                                                       if (isInserted) {
+                                                               break;
+                                                       }
+
+                                                       // Process each entry in the slot
+                                                       for (Entry entry : s.getEntries()) {
+
+                                                               if (entry.getType() == Entry.TypeLastMessage) {
+                                                                       LastMessage lastMessage = (LastMessage)entry;
+                                                                       if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+                                                                               isInserted = true;
+                                                                               break;
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+
+                                               if (isInserted) {
+                                                       if (newKey != null) {
+                                                               if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                                       newKey = null;
+                                                               }
+                                                       }
+
+                                                       for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                               transaction.resetServerFailure();
+
+                                                               // Update which transactions parts still need to be sent
+                                                               transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                               // Add the transaction status to the outstanding list
+                                                               outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                               // Update the transaction status
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                               // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                               if (transaction.didSendAllParts()) {
+                                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                                       pendingTransactionQueue.remove(transaction);
+                                                               } else {
+                                                                       transaction.resetServerFailure();
+                                                                       // Set the transaction sequence number back to nothing
+                                                                       if (!transaction.didSendAPartToServer()) {
+                                                                               transaction.setSequenceNumber(-1);
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                               transaction.resetServerFailure();
+                                               // Set the transaction sequence number back to nothing
+                                               if (!transaction.didSendAPartToServer()) {
+                                                       transaction.setSequenceNumber(-1);
+                                               }
+                                       }
+
+                                       if (sendSlotsReturn.getThird().length != 0) {
+                                               // insert into the local block chain
+                                               validateAndUpdate(sendSlotsReturn.getThird(), true);
+                                       }
+                                       // continue;
+                               } else {
+                                       boolean isInserted = false;
+                                       for (Slot s : newSlots) {
+                                               if ((s.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+                                                       isInserted = true;
+                                                       break;
+                                               }
+                                       }
+
+                                       for (Slot s : newSlots) {
+                                               if (isInserted) {
+                                                       break;
+                                               }
+
+                                               // Process each entry in the slot
+                                               for (Entry entry : s.getEntries()) {
+
+                                                       if (entry.getType() == Entry.TypeLastMessage) {
+                                                               LastMessage lastMessage = (LastMessage)entry;
+                                                               if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == lastSlotAttemptedToSend.getSequenceNumber())) {
+                                                                       isInserted = true;
+                                                                       break;
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       if (isInserted) {
+                                               if (newKey != null) {
+                                                       if (lastInsertedNewKey && (lastNewKey.getKey() == newKey.getKey()) && (lastNewKey.getMachineID() == newKey.getMachineID())) {
+                                                               newKey = null;
+                                                       }
+                                               }
+
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+
+                                                       // Update which transactions parts still need to be sent
+                                                       transaction.removeSentParts(lastTransactionPartsSent.get(transaction));
+
+                                                       // Add the transaction status to the outstanding list
+                                                       outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                                       // Update the transaction status
+                                                       transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                                       if (transaction.didSendAllParts()) {
+                                                               transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                                               pendingTransactionQueue.remove(transaction);
+                                                       } else {
+                                                               transaction.resetServerFailure();
+                                                               // Set the transaction sequence number back to nothing
+                                                               if (!transaction.didSendAPartToServer()) {
+                                                                       transaction.setSequenceNumber(-1);
+                                                               }
+                                                       }
+                                               }
+                                       } else {
+                                               for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                                       transaction.resetServerFailure();
+                                                       // Set the transaction sequence number back to nothing
+                                                       if (!transaction.didSendAPartToServer()) {
+                                                               transaction.setSequenceNumber(-1);
+                                                       }
+                                               }
+                                       }
+
+                                       // insert into the local block chain
+                                       validateAndUpdate(newSlots, true);
+                               }
+                       }
+               } catch (ServerException e) {
+                       throw e;
+               }
+
+
+
                try {
                        // While we have stuff that needs inserting into the block chain
                        while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
 
+                               fromRetry = false;
+
+                               if (hadPartialSendToServer) {
+                                       throw new Error("Should Be error free");
+                               }
+
+
+
+                               // If there is a new key with same name then end
+                               if ((newKey != null) && (arbitratorTable.get(newKey.getKey()) != null)) {
+                                       return false;
+                               }
+
                                // Create the slot
-                               Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
+                               Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC(), localSequenceNumber);
+                               localSequenceNumber++;
 
                                // Try to fill the slot with data
                                ThreeTuple<Boolean, Integer, Boolean> fillSlotsReturn = fillSlot(slot, false, newKey);
@@ -521,7 +756,7 @@ final public class Table {
                                                transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
-                                               if (!transaction.didSendAPartToServer()) {
+                                               if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                        transaction.setSequenceNumber(-1);
                                                }
                                        }
@@ -534,13 +769,22 @@ final public class Table {
                                        fillSlot(slot, true, newKey);
                                }
 
-                               // Try to send to the server
+                               lastSlotAttemptedToSend = slot;
+                               lastIsNewKey = (newKey != null);
+                               lastInsertedNewKey = insertedNewKey;
+                               lastNewSize = newSize;
+                               lastNewKey = newKey;
+                               lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+                               lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+
+
                                ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
 
-                               if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
+                               if (sendSlotsReturn.getFirst()) {
+
                                        // Did insert into the block chain
 
-                                       if (sendSlotsReturn.getFirst()) {
+                                       if (insertedNewKey) {
                                                // This slot was what was inserted not a previous slot
 
                                                // New Key was successfully inserted into the block chain so dont want to insert it again
@@ -557,13 +801,10 @@ final public class Table {
                                                        iter.remove();
                                                }
                                        }
-                                       
-                                       for (Transaction transaction : transactionPartsSent.keySet()) {
-
 
+                                       for (Transaction transaction : transactionPartsSent.keySet()) {
                                                transaction.resetServerFailure();
 
-
                                                // Update which transactions parts still need to be sent
                                                transaction.removeSentParts(transactionPartsSent.get(transaction));
 
@@ -580,13 +821,43 @@ final public class Table {
                                                }
                                        }
                                } else {
+
+                                       // if (!sendSlotsReturn.getSecond()) {
+                                       //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                       //              transaction.resetServerFailure();
+                                       //      }
+                                       // } else {
+                                       //      for (Transaction transaction : lastTransactionPartsSent.keySet()) {
+                                       //              transaction.resetServerFailure();
+
+                                       //              // Update which transactions parts still need to be sent
+                                       //              transaction.removeSentParts(transactionPartsSent.get(transaction));
+
+                                       //              // Add the transaction status to the outstanding list
+                                       //              outstandingTransactionStatus.put(transaction.getSequenceNumber(), transaction.getTransactionStatus());
+
+                                       //              // Update the transaction status
+                                       //              transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentPartial);
+
+                                       //              // Check if all the transaction parts were successfully sent and if so then remove it from pending
+                                       //              if (transaction.didSendAllParts()) {
+                                       //                      transaction.getTransactionStatus().setStatus(TransactionStatus.StatusSentFully);
+                                       //                      pendingTransactionQueue.remove(transaction);
+
+                                       //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+                                       //                              System.out.println("Sent: " + kv + "  from: " + localMachineId + "   Slot:" + lastSlotAttemptedToSend.getSequenceNumber() + "  Claimed:" + transaction.getSequenceNumber());
+                                       //                      }
+                                       //              }
+                                       //      }
+                                       // }
+
                                        // Reset which transaction to send
                                        for (Transaction transaction : transactionPartsSent.keySet()) {
                                                transaction.resetNextPartToSend();
-                                               transaction.resetNextPartToSend();
+                                               // transaction.resetNextPartToSend();
 
                                                // Set the transaction sequence number back to nothing
-                                               if (!transaction.didSendAPartToServer()) {
+                                               if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                        transaction.setSequenceNumber(-1);
                                                }
                                        }
@@ -601,10 +872,8 @@ final public class Table {
                                        validateAndUpdate(sendSlotsReturn.getThird(), true);
                                }
                        }
-               } catch (ServerException e) {
-
-                       System.out.println("Server Failure:   " + e.getType());
 
+               } catch (ServerException e) {
 
                        if (e.getType() != ServerException.TypeInputTimeout) {
                                // e.printStackTrace();
@@ -614,7 +883,7 @@ final public class Table {
                                        transaction.resetNextPartToSend();
 
                                        // Set the transaction sequence number back to nothing
-                                       if (!transaction.didSendAPartToServer()) {
+                                       if (!transaction.didSendAPartToServer() && !transaction.getServerFailure()) {
                                                transaction.setSequenceNumber(-1);
                                        }
                                }
@@ -622,6 +891,12 @@ final public class Table {
                                // There was a partial send to the server
                                hadPartialSendToServer = true;
 
+
+                               // if (!fromRetry) {
+                               //      lastTransactionPartsSent = new HashMap<Transaction, List<Integer>>(transactionPartsSent);
+                               //      lastPendingSendArbitrationEntriesToDelete = new ArrayList<Entry>(pendingSendArbitrationEntriesToDelete);
+                               // }
+
                                // Nothing was able to be sent to the server so just clear these data structures
                                for (Transaction transaction : transactionPartsSent.keySet()) {
                                        transaction.resetNextPartToSend();
@@ -638,7 +913,7 @@ final public class Table {
                return newKey == null;
        }
 
-       public synchronized boolean updateFromLocal(long machineId) {
+       private synchronized boolean updateFromLocal(long machineId) {
                Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(machineId);
                if (localCommunicationInformation == null) {
                        // Cant talk to that device locally so do nothing
@@ -661,7 +936,8 @@ final public class Table {
                bbEncode.putInt(0);
 
                // Send by local
-               byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+               byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+               localSequenceNumber++;
 
                if (returnData == null) {
                        // Could not contact server
@@ -695,7 +971,7 @@ final public class Table {
 
                if (localCommunicationInformation == null) {
                        // Cant talk to that device locally so do nothing
-                       return new Pair<Boolean, Boolean>(false, false);
+                       return new Pair<Boolean, Boolean>(true, false);
                }
 
                // Get the size of the send data
@@ -720,8 +996,10 @@ final public class Table {
                        part.encode(bbEncode);
                }
 
+
                // Send by local
-               byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+               byte[] returnData = cloud.sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+               localSequenceNumber++;
 
                if (returnData == null) {
                        // Could not contact server
@@ -773,6 +1051,7 @@ final public class Table {
        }
 
        public synchronized byte[] acceptDataFromLocal(byte[] data) {
+
                // Decode the data
                ByteBuffer bbDecode = ByteBuffer.wrap(data);
                long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
@@ -873,6 +1152,8 @@ final public class Table {
                        entry.encode(bbEncode);
                }
 
+
+               localSequenceNumber++;
                return returnData;
        }
 
@@ -943,9 +1224,12 @@ final public class Table {
         * Returns false if a resize was needed
         */
        private ThreeTuple<Boolean, Integer, Boolean> fillSlot(Slot slot, boolean resize, NewKey newKeyEntry) {
+
+
                int newSize = 0;
                if (liveSlotCount > bufferResizeThreshold) {
                        resize = true; //Resize is forced
+
                }
 
                if (resize) {
@@ -974,6 +1258,7 @@ final public class Table {
                if (newKeyEntry != null) {
                        newKeyEntry.setSlot(slot);
                        if (slot.hasSpace(newKeyEntry)) {
+
                                slot.addEntry(newKeyEntry);
                                inserted = true;
                        }
@@ -1017,10 +1302,15 @@ final public class Table {
                        Transaction transaction = pendingTransactionQueue.get(0);
 
                        // Set the transaction sequence number if it has yet to be inserted into the block chain
-                       if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+                       // if ((!transaction.didSendAPartToServer() && !transaction.getServerFailure()) || (transaction.getSequenceNumber() == -1)) {
+                       //      transaction.setSequenceNumber(slot.getSequenceNumber());
+                       // }
+
+                       if ((!transaction.didSendAPartToServer()) || (transaction.getSequenceNumber() == -1)) {
                                transaction.setSequenceNumber(slot.getSequenceNumber());
                        }
 
+
                        while (true) {
                                TransactionPart part = transaction.getNextPartToSend();
 
@@ -1044,53 +1334,6 @@ final public class Table {
                        }
                }
 
-
-               // // Insert as many transactions as possible while keeping order
-               // for (Transaction transaction : pendingTransactionQueue) {
-
-               //      // Set the transaction sequence number if it has yet to be inserted into the block chain
-               //      if (!transaction.didSendAPartToServer()) {
-               //              transaction.setSequenceNumber(slot.getSequenceNumber());
-               //      }
-
-               //      boolean ranOutOfSpace = false;
-
-               //      while (true) {
-               //              TransactionPart part = transaction.getNextPartToSend();
-
-               //              if (part == null) {
-               //                      // Ran out of parts to send for this transaction so move on
-               //                      break;
-               //              }
-
-               //              if (slot.hasSpace(part)) {
-               //                      slot.addEntry(part);
-               //                      List<Integer> partsSent = transactionPartsSent.get(transaction);
-               //                      if (partsSent == null) {
-               //                              partsSent = new ArrayList<Integer>();
-               //                              transactionPartsSent.put(transaction, partsSent);
-               //                      }
-               //                      partsSent.add(part.getPartNumber());
-               //                      transactionPartsSent.put(transaction, partsSent);
-
-               //                      for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-               //                              System.out.println("Inserted Into Slot: " + kv);
-               //                      }
-
-               //                      ranOutOfSpace = true;
-               //                      break;
-
-               //              } else {
-               //                      ranOutOfSpace = true;
-               //                      break;
-               //              }
-               //      }
-
-               //      if (ranOutOfSpace) {
-               //              break;
-               //      }
-               // }
-
                // Fill the remainder of the slot with rescue data
                doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
 
@@ -1106,7 +1349,7 @@ final public class Table {
                        long old_seqn = rejectedSlotList.firstElement();
                        if (rejectedSlotList.size() > REJECTED_THRESHOLD) {
                                long new_seqn = rejectedSlotList.lastElement();
-                               RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false);
+                               RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
                                s.addEntry(rm);
                        } else {
                                long prev_seqn = -1;
@@ -1121,7 +1364,7 @@ final public class Table {
                                }
                                /* Generate rejected message entry for missing messages */
                                if (prev_seqn != -1) {
-                                       RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
                                        s.addEntry(rm);
                                }
                                /* Generate rejected message entries for present messages */
@@ -1129,7 +1372,7 @@ final public class Table {
                                        long curr_seqn = rejectedSlotList.get(i);
                                        Slot s_msg = buffer.getSlot(curr_seqn);
                                        long machineid = s_msg.getMachineID();
-                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+                                       RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
                                        s.addEntry(rm);
                                }
                        }
@@ -1225,11 +1468,6 @@ final public class Table {
                        return;
                }
 
-               // Reset the table status declared sizes
-               smallestTableStatusSeen = -1;
-               largestTableStatusSeen = -1;
-
-
                // Make sure all slots are newer than the last largest slot this client has seen
                long firstSeqNum = newSlots[0].getSequenceNumber();
                if (firstSeqNum <= sequenceNumber) {
@@ -1249,6 +1487,8 @@ final public class Table {
                // Process each slots data
                for (Slot slot : newSlots) {
                        processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+
+                       updateExpectedSize();
                }
 
                // If there is a gap, check to see if the server sent us everything.
@@ -1320,6 +1560,37 @@ final public class Table {
                updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
+       private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) {
+               // if (didFindTableStatus) {
+               // return;
+               // }
+               long prevslots = firstSequenceNumber;
+
+
+               if (didFindTableStatus) {
+                       // expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : expectedsize;
+                       // System.out.println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
+
+               } else {
+                       expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+                       // System.out.println("Here: " + expectedsize);
+               }
+
+               // System.out.println(numberOfSlots);
+
+               didFindTableStatus = true;
+               currMaxSize = numberOfSlots;
+       }
+
+       private void updateExpectedSize() {
+               expectedsize++;
+
+               if (expectedsize > currMaxSize) {
+                       expectedsize = currMaxSize;
+               }
+       }
+
+
        /**
         * Check the size of the block chain to make sure there are enough slots sent back by the server.
         * This is only called when we have a gap between the slots that we have locally and the slots
@@ -1327,41 +1598,30 @@ final public class Table {
         * status message
         */
        private void checkNumSlots(int numberOfSlots) {
-
-               // We only have 1 size so we must have this many slots
-               if (largestTableStatusSeen == smallestTableStatusSeen) {
-                       if (numberOfSlots != smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
-               } else {
-                       // We have more than 1
-                       if (numberOfSlots < smallestTableStatusSeen) {
-                               throw new Error("Server Error: Server did not send all slots.  Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots);
-                       }
+               if (numberOfSlots != expectedsize) {
+                       throw new Error("Server Error: Server did not send all slots.  Expected: " + expectedsize + " Received:" + numberOfSlots);
                }
        }
 
+       private void updateCurrMaxSize(int newmaxsize) {
+               currMaxSize = newmaxsize;
+       }
+
+
        /**
         * Update the size of of the local buffer if it is needed.
         */
        private void commitNewMaxSize() {
-
-               int currMaxSize = 0;
-
-               if (largestTableStatusSeen == -1) {
-                       // No table status seen so the current max size does not change
-                       currMaxSize = numberOfSlots;
-               } else {
-                       currMaxSize = largestTableStatusSeen;
-               }
+               didFindTableStatus = false;
 
                // Resize the local slot buffer
                if (numberOfSlots != currMaxSize) {
-                       buffer.resize(currMaxSize);
+                       buffer.resize((int)currMaxSize);
                }
 
                // Change the number of local slots to the new size
-               numberOfSlots = currMaxSize;
+               numberOfSlots = (int)currMaxSize;
+
 
                // Recalculate the resize threshold since the size of the local buffer has changed
                setResizeThreshold();
@@ -1406,12 +1666,6 @@ final public class Table {
 
                                // Add that part to the transaction
                                transaction.addPartDecode(part);
-
-                               if (transaction.isComplete()) {
-                                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-                                               System.out.println("Got Live Transaction   " + kv + "    " + part.getSequenceNumber());
-                                       }
-                               }
                        }
                }
 
@@ -1419,6 +1673,9 @@ final public class Table {
                newTransactionParts.clear();
        }
 
+
+       private long lastSeqNumArbOn = 0;
+
        private void arbitrateFromServer() {
 
                if (liveTransactionBySequenceNumberTable.size() == 0) {
@@ -1440,11 +1697,17 @@ final public class Table {
                for (Long transactionSequenceNumber : transactionSequenceNumbers) {
                        Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
 
+
+
                        // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
                        if (transaction.getArbitrator() != localMachineId) {
                                continue;
                        }
 
+                       if (transactionSequenceNumber < lastSeqNumArbOn) {
+                               continue;
+                       }
+
                        if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
                                // We have seen this already locally so dont commit again
                                continue;
@@ -1457,6 +1720,7 @@ final public class Table {
                                break;
                        }
 
+
                        // update the largest transaction seen by arbitrator from server
                        if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
                                lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
@@ -1467,11 +1731,6 @@ final public class Table {
                                }
                        }
 
-
-                       for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-                               System.out.println("Arbitrating on:     " + kv);
-                       }
-
                        if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
                                // Guard evaluated as true
 
@@ -1481,7 +1740,7 @@ final public class Table {
                                }
 
                                // Update what the last transaction committed was for use in batch commit
-                               lastTransactionCommitted = transaction.getSequenceNumber();
+                               lastTransactionCommitted = transactionSequenceNumber;
                        } else {
                                // Guard evaluated was false so create abort
 
@@ -1498,11 +1757,11 @@ final public class Table {
 
                                // Insert the abort so we can process
                                processEntry(newAbort);
-
-                               for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
-                                       System.out.println("Abort From Server!!!!!!     " + kv);
-                               }
                        }
+
+                       lastSeqNumArbOn = transactionSequenceNumber;
+
+                       // liveTransactionBySequenceNumberTable.remove(transactionSequenceNumber);
                }
 
                Commit newCommit = null;
@@ -1536,8 +1795,10 @@ final public class Table {
 
                        if (compactArbitrationData()) {
                                ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
-                               for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
-                                       processEntry(commitPart);
+                               if (newArbitrationRound.getCommit() != null) {
+                                       for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+                                               processEntry(commitPart);
+                                       }
                                }
                        }
                }
@@ -1617,7 +1878,6 @@ final public class Table {
                                        status.setStatus(TransactionStatus.StatusAborted);
                                }
                        } else {
-
                                Set addAbortSet = new HashSet<Abort>();
 
 
@@ -1722,7 +1982,7 @@ final public class Table {
                                pendingSendArbitrationRounds.clear();
                        } else {
                                for (int i = 0; i < numberToDelete; i++) {
-                                       pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
+                                       pendingSendArbitrationRounds.removeIndex(pendingSendArbitrationRounds.size() - 1);
                                }
                        }
 
@@ -1799,6 +2059,12 @@ final public class Table {
                        List<Long> commitSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
                        Collections.sort(commitSequenceNumbers);
 
+                       // Get the last commit seen from this arbitrator
+                       long lastCommitSeenSequenceNumber = -1;
+                       if (lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId) != null) {
+                               lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(arbitratorId);
+                       }
+
                        // Go through each new commit one by one
                        for (int i = 0; i < commitSequenceNumbers.size(); i++) {
                                Long commitSequenceNumber = commitSequenceNumbers.get(i);
@@ -1818,10 +2084,14 @@ final public class Table {
                                        }
                                }
 
-                               // Get the last commit seen from this arbitrator
-                               long lastCommitSeenSequenceNumber = -1;
-                               if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
-                                       lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
+                               // Update the last transaction that was updated if we can
+                               if (commit.getTransactionSequenceNumber() != -1) {
+                                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
+
+                                       // Update the last transaction sequence number that the arbitrator arbitrated on
+                                       if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
+                                               lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
+                                       }
                                }
 
                                // Update the last arbitration data that we have seen so far
@@ -1885,25 +2155,13 @@ final public class Table {
                                        if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
                                                lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
                                        }
-                               }
-
-
-
-                               // Update the last transaction that was updated if we can
-                               if (commit.getTransactionSequenceNumber() != -1) {
-                                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(commit.getMachineId());
-
-                                       // Update the last transaction sequence number that the arbitrator arbitrated on
-                                       if ((lastTransactionNumber == null) || (lastTransactionNumber < commit.getTransactionSequenceNumber())) {
-                                               lastArbitratedTransactionNumberByArbitratorTable.put(commit.getMachineId(), commit.getTransactionSequenceNumber());
-                                       }
+                               } else {
+                                       lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
                                }
 
                                // We processed a new commit that we havent seen before
                                didProcessANewCommit = true;
 
-
-
                                // Update the committed table of keys and which commit is using which key
                                for (KeyValue kv : commit.getKeyValueUpdateSet()) {
                                        committedKeyValueTable.put(kv.getKey(), kv);
@@ -2107,7 +2365,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeTableStatus:
-                               processEntry((TableStatus)entry);
+                               processEntry((TableStatus)entry, slot.getSequenceNumber());
                                break;
 
                        default:
@@ -2145,8 +2403,11 @@ final public class Table {
         * keeps track of the largest and smallest table status seen in this current round
         * of updating the local copy of the block chain
         */
-       private void processEntry(TableStatus entry) {
+       private void processEntry(TableStatus entry, long seq) {
                int newNumSlots = entry.getMaxSlots();
+               updateCurrMaxSize(newNumSlots);
+
+               initExpectedSize(seq, newNumSlots);
 
                if (liveTableStatus != null) {
                        // We have a larger table status so the old table status is no longer alive
@@ -2155,14 +2416,6 @@ final public class Table {
 
                // Make this new table status the latest alive table status
                liveTableStatus = entry;
-
-               if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) {
-                       smallestTableStatusSeen = newNumSlots;
-               }
-
-               if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) {
-                       largestTableStatusSeen = newNumSlots;
-               }
        }
 
        /**
@@ -2173,6 +2426,7 @@ final public class Table {
                long newSeqNum = entry.getNewSeqNum();
                boolean isequal = entry.getEqual();
                long machineId = entry.getMachineID();
+               long seq = entry.getSequenceNumber();
 
 
                // Check if we have messages that were supposed to be rejected in our local block chain
@@ -2208,7 +2462,7 @@ final public class Table {
                        Pair<Long, Liveness> lastMessageValue = lastMessageEntry.getValue();
                        long entrySequenceNumber = lastMessageValue.getFirst();
 
-                       if (entrySequenceNumber < newSeqNum) {
+                       if (entrySequenceNumber < seq) {
 
                                // Add this rejected message to the set of messages that this machine ID did not see yet
                                addWatchList(lastMessageEntryMachineId, entry);
@@ -2331,6 +2585,21 @@ final public class Table {
         * Process new commit entries and save them for future use.  Delete duplicates
         */
        private void processEntry(CommitPart entry) {
+
+
+               // Update the last transaction that was updated if we can
+               if (entry.getTransactionSequenceNumber() != -1) {
+                       Long lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable.get(entry.getMachineId());
+
+                       // Update the last transaction sequence number that the arbitrator arbitrated on
+                       if ((lastTransactionNumber == null) || (lastTransactionNumber < entry.getTransactionSequenceNumber())) {
+                               lastArbitratedTransactionNumberByArbitratorTable.put(entry.getMachineId(), entry.getTransactionSequenceNumber());
+                       }
+               }
+
+
+
+
                Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
 
                if (commitPart == null) {
@@ -2369,7 +2638,7 @@ final public class Table {
                                RejectedMessage rm = rmit.next();
 
                                // If this machine Id has seen this rejected message...
-                               if (rm.getNewSeqNum() <= seqNum) {
+                               if (rm.getSequenceNumber() <= seqNum) {
 
                                        // Remove it from our watchlist
                                        rmit.remove();
@@ -2476,4 +2745,4 @@ final public class Table {
                                throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot);
                }
        }
-}
\ No newline at end of file
+}