API Changes
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index 6aeb28e3628c57f611e56921717e83f924bad66a..be9defb91cef43d5921db27bd9f0ed3e9e3bb02e 100644 (file)
@@ -13,6 +13,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.Collection;
 import java.util.Collections;
+import java.nio.ByteBuffer;
 
 
 /**
@@ -24,9 +25,6 @@ import java.util.Collections;
 final public class Table {
        private int numslots;   //number of slots stored in buffer
 
-       //table of key-value pairs
-       //private HashMap<IoTString, KeyValue> table = new HashMap<IoTString, KeyValue>();
-
        // machine id -> (sequence number, Slot or LastMessage); records last message by each client
        private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
        // machine id -> ...
@@ -52,10 +50,12 @@ final public class Table {
        private int smallestTableStatusSeen = -1;
        private int largestTableStatusSeen = -1;
        private int lastSeenPendingTransactionSpeculateIndex = 0;
+       private int commitSequenceNumber = 0;
+       private long localTransactionSequenceNumber = 0;
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
-       private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
+       private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
        private Map<Long, Abort> abortMap = null; // Set of the live aborts
        private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
        private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
@@ -63,20 +63,24 @@ final public class Table {
        private Map<Long, Transaction> uncommittedTransactionsMap = null;
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
        private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
-       private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+       private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
        private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
-       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
+       private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
        private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
+       private List<Commit> pendingCommitsList = null;
+       private List<Commit> pendingCommitsToDelete = null;
+       private Map<Long, LocalComm> localCommunicationChannels;
+       private Map<Long, TransactionStatus> transactionStatusMap = null;
 
 
-
-       public Table(String baseurl, String password, long _localmachineid) {
+       public Table(String hostname, String baseurl, String password, long _localmachineid) {
                localmachineid = _localmachineid;
                buffer = new SlotBuffer();
                numslots = buffer.capacity();
                setResizeThreshold();
                sequencenumber = 0;
-               cloud = new CloudComm(this, baseurl, password);
+               cloud = new CloudComm(this, hostname, baseurl, password);
                lastliveslotseqn = 1;
 
                setupDataStructs();
@@ -95,7 +99,7 @@ final public class Table {
 
        private void setupDataStructs() {
                pendingTransQueue = new LinkedList<PendingTransaction>();
-               commitMap = new HashMap<Long, Commit>();
+               commitMap = new HashMap<Long, Map<Long, Commit>>();
                abortMap = new HashMap<Long, Abort>();
                committedMapByKey = new HashMap<IoTString, Commit>();
                commitedTable = new HashMap<IoTString, KeyValue>();
@@ -103,10 +107,30 @@ final public class Table {
                uncommittedTransactionsMap = new HashMap<Long, Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
                newKeyTable = new HashMap<IoTString, NewKey>();
-               newCommitMap = new HashMap<Long, Commit>();
+               newCommitMap = new HashMap<Long, Map<Long, Commit>>();
                lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+               lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
                lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
                pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
+               pendingCommitsList = new LinkedList<Commit>();
+               pendingCommitsToDelete = new LinkedList<Commit>();
+               localCommunicationChannels = new HashMap<Long, LocalComm>();
+               transactionStatusMap = new HashMap<Long, TransactionStatus>();
+       }
+
+       public void initTable() throws ServerException {
+               cloud.setSalt();//Set the salt
+               Slot s = new Slot(this, 1, localmachineid);
+               TableStatus status = new TableStatus(s, numslots);
+               s.addEntry(status);
+               Slot[] array = cloud.putSlot(s, numslots);
+               if (array == null) {
+                       array = new Slot[] {s};
+                       /* update data structure */
+                       validateandupdate(array, true);
+               } else {
+                       throw new Error("Error on initialization");
+               }
        }
 
        public void rebuild() throws ServerException {
@@ -114,45 +138,112 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
-       // // TODO: delete method
-       // public void printSlots() {
-       //      long o = buffer.getOldestSeqNum();
-       //      long n = buffer.getNewestSeqNum();
-
-       //      int[] types = new int[10];
-
-       //      int num = 0;
-
-       //      int livec = 0;
-       //      int deadc = 0;
-       //      for (long i = o; i < (n + 1); i++) {
-       //              Slot s = buffer.getSlot(i);
-
-       //              Vector<Entry> entries = s.getEntries();
-
-       //              for (Entry e : entries) {
-       //                      if (e.isLive()) {
-       //                              int type = e.getType();
-       //                              types[type] = types[type] + 1;
-       //                              num++;
-       //                              livec++;
-       //                      } else {
-       //                              deadc++;
-       //                      }
-       //              }
-       //      }
-
-       //      for (int i = 0; i < 10; i++) {
-       //              System.out.println(i + "    " + types[i]);
-       //      }
-       //      System.out.println("Live count:   " + livec);
-       //      System.out.println("Dead count:   " + deadc);
-       //      System.out.println("Old:   " + o);
-       //      System.out.println("New:   " + n);
-       //      System.out.println("Size:   " + buffer.size());
-       //      System.out.println("Commits Map:   " + commitedTable.size());
-       //      System.out.println("Commits List:   " + commitMap.size());
-       // }
+       // TODO: delete method
+       public void printSlots() {
+               long o = buffer.getOldestSeqNum();
+               long n = buffer.getNewestSeqNum();
+
+               int[] types = new int[10];
+
+               int num = 0;
+
+               int livec = 0;
+               int deadc = 0;
+               for (long i = o; i < (n + 1); i++) {
+                       Slot s = buffer.getSlot(i);
+
+                       Vector<Entry> entries = s.getEntries();
+
+                       for (Entry e : entries) {
+                               if (e.isLive()) {
+                                       int type = e.getType();
+                                       types[type] = types[type] + 1;
+                                       num++;
+                                       livec++;
+                               } else {
+                                       deadc++;
+                               }
+                       }
+               }
+
+               for (int i = 0; i < 10; i++) {
+                       System.out.println(i + "    " + types[i]);
+               }
+               System.out.println("Live count:   " + livec);
+               System.out.println("Dead count:   " + deadc);
+               System.out.println("Old:   " + o);
+               System.out.println("New:   " + n);
+               System.out.println("Size:   " + buffer.size());
+               System.out.println("Commits Key Map:   " + commitedTable.size());
+               // System.out.println("Commits Live Map:   " + commitMap.size());
+               System.out.println("Pending:   " + pendingTransQueue.size());
+
+               // List<IoTString> strList = new ArrayList<IoTString>();
+               // for (int i = 0; i < 100; i++) {
+               //      String keyA = "a" + i;
+               //      String keyB = "b" + i;
+               //      String keyC = "c" + i;
+               //      String keyD = "d" + i;
+
+               //      IoTString iKeyA = new IoTString(keyA);
+               //      IoTString iKeyB = new IoTString(keyB);
+               //      IoTString iKeyC = new IoTString(keyC);
+               //      IoTString iKeyD = new IoTString(keyD);
+
+               //      strList.add(iKeyA);
+               //      strList.add(iKeyB);
+               //      strList.add(iKeyC);
+               //      strList.add(iKeyD);
+               // }
+
+
+               // for (Long l : commitMap.keySet()) {
+               //      for (Long l2 : commitMap.get(l).keySet()) {
+               //              for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+               //                      strList.remove(kv.getKey());
+               //                      System.out.print(kv.getKey() + "    ");
+               //              }
+               //      }
+               // }
+
+               // System.out.println();
+               // System.out.println();
+
+               // for (IoTString s : strList) {
+               //      System.out.print(s + "    ");
+               // }
+               // System.out.println();
+               // System.out.println(strList.size());
+       }
+
+       public long getId() {
+               return localmachineid;
+       }
+
+       public boolean hasConnection() {
+               return cloud.hasConnection();
+       }
+
+       public String toString() {
+               String retString = " Committed Table: \n";
+               retString += "---------------------------\n";
+               retString += commitedTable.toString();
+
+               retString += "\n\n";
+
+               retString += " Speculative Table: \n";
+               retString += "---------------------------\n";
+               retString += speculativeTable.toString();
+
+               return retString;
+       }
+
+       public void addLocalComm(long machineId, LocalComm lc) {
+               localCommunicationChannels.put(machineId, lc);
+       }
+       public Long getArbitrator(IoTString key) {
+               return arbitratorTable.get(key);
+       }
 
        public IoTString getCommitted(IoTString key) {
                KeyValue kv = commitedTable.get(key);
@@ -234,37 +325,24 @@ final public class Table {
                }
        }
 
-       public Long getArbitrator(IoTString key) {
-               return arbitratorTable.get(key);
-       }
-
-       public void initTable() throws ServerException {
-               cloud.setSalt();//Set the salt
-               Slot s = new Slot(this, 1, localmachineid);
-               TableStatus status = new TableStatus(s, numslots);
-               s.addEntry(status);
-               Slot[] array = cloud.putSlot(s, numslots);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       /* update data structure */
-                       validateandupdate(array, true);
-               } else {
-                       throw new Error("Error on initialization");
-               }
-       }
+       public void update() {
+               try {
+                       Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+                       validateandupdate(newslots, false);
 
-       public String toString() {
-               String retString = " Committed Table: \n";
-               retString += "---------------------------\n";
-               retString += commitedTable.toString();
+                       if (!pendingTransQueue.isEmpty()) {
 
-               retString += "\n\n";
+                               // We have a pending transaction so do full insertion
+                               processPendingTrans();
+                       } else {
 
-               retString += " Speculative Table: \n";
-               retString += "---------------------------\n";
-               retString += speculativeTable.toString();
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       }
 
-               return retString;
+               } catch (Exception e) {
+                       // could not update so do nothing
+               }
        }
 
        public void startTransaction() {
@@ -272,150 +350,372 @@ final public class Table {
                pendingTransBuild = new PendingTransaction();
        }
 
-       public void commitTransaction() throws ServerException {
+       public void addKV(IoTString key, IoTString value) {
+
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
+               // Make sure new key value pair matches the current arbitrator
+               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match Arbitrator.");
+               }
+
+               KeyValue kv = new KeyValue(key, value);
+               pendingTransBuild.addKV(kv);
+       }
+
+       public TransactionStatus commitTransaction() {
 
                if (pendingTransBuild.getKVUpdates().size() == 0) {
-                       // If no updates are made then there is no point inserting into the chain
-                       return;
+
+                       // transaction with no updates will have no effect on the system
+                       return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
                }
 
-               // Add the pending transaction to the queue
-               pendingTransQueue.add(pendingTransBuild);
+               TransactionStatus transStatus = null;
 
-               for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
-                       PendingTransaction pt = pendingTransQueue.get(i);
+               if (pendingTransBuild.getArbitrator() != localmachineid) {
 
-                       if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+                       // set the local sequence number so we can recognize this transaction later
+                       pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
+                       localTransactionSequenceNumber++;
 
-                               lastSeenPendingTransactionSpeculateIndex = i;
+                       transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
+                       transactionStatusMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
 
-                               for (KeyValue kv : pt.getKVUpdates()) {
-                                       pendingTransSpeculativeTable.put(kv.getKey(), kv);
-                               }
+                       // Add the pending transaction to the queue
+                       pendingTransQueue.add(pendingTransBuild);
 
-                       }
-               }
 
+                       for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
+                               PendingTransaction pt = pendingTransQueue.get(i);
 
-               // Delete since already inserted
-               pendingTransBuild = new PendingTransaction();
+                               if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
 
-               while (!pendingTransQueue.isEmpty()) {
-                       if (tryput( pendingTransQueue.peek(), false)) {
-                               pendingTransQueue.poll();
+                                       lastSeenPendingTransactionSpeculateIndex = i;
+
+                                       for (KeyValue kv : pt.getKVUpdates()) {
+                                               pendingTransSpeculativeTable.put(kv.getKey(), kv);
+                                       }
+
+                               }
                        }
-               }
-       }
+               } else {
+                       Transaction ut = new Transaction(null,
+                                                        -1,
+                                                        localmachineid,
+                                                        pendingTransBuild.getArbitrator(),
+                                                        pendingTransBuild.getKVUpdates(),
+                                                        pendingTransBuild.getKVGuard());
 
-       public void addKV(IoTString key, IoTString value) {
+                       Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
 
-               if (arbitratorTable.get(key) == null) {
-                       throw new Error("Key not Found.");
+                       if (retData.getFirst()) {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
+                       } else {
+                               transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
+                       }
                }
 
-               // Make sure new key value pair matches the current arbitrator
-               if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
-                       // TODO: Maybe not throw en error
-                       throw new Error("Not all Key Values Match Arbitrator.");
+               // Try to insert transactions if possible
+               if (!pendingTransQueue.isEmpty()) {
+                       // We have a pending transaction so do full insertion
+                       processPendingTrans();
+               } else {
+                       try {
+                               // We dont have a pending transaction so do minimal effort
+                               updateWithNotPendingTrans();
+                       } catch (Exception e) {
+                               // Do nothing
+                       }
                }
 
-               KeyValue kv = new KeyValue(key, value);
-               pendingTransBuild.addKV(kv);
+               // reset it so next time is fresh
+               pendingTransBuild = new PendingTransaction();
+
+               return transStatus;
        }
 
-       public void update() throws ServerException {
-               Slot[] newslots = cloud.getSlots(sequencenumber + 1);
-               validateandupdate(newslots, false);
+       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
 
-               if (!pendingTransQueue.isEmpty()) {
-                       System.out.println("Full Update");
+               while (true) {
+                       if (arbitratorTable.get(keyName) != null) {
+                               // There is already an arbitrator
+                               return false;
+                       }
 
-                       // We have a pending transaction so do full insertion
+                       if (tryput(keyName, machineId, false)) {
+                               // If successfully inserted
+                               return true;
+                       }
+               }
+       }
+
+       private void processPendingTrans() {
 
+               boolean sentAllPending = false;
+               try {
                        while (!pendingTransQueue.isEmpty()) {
                                if (tryput( pendingTransQueue.peek(), false)) {
                                        pendingTransQueue.poll();
                                }
                        }
-               } else {
-                       // We dont have a pending transaction so do minimal effort
-                       if (uncommittedTransactionsMap.keySet().size() > 0) {
-
-                               boolean doEnd = false;
-                               boolean needResize = false;
-                               while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
-                                       boolean resize = needResize;
-                                       needResize = false;
-
-                                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
-                                       int newsize = 0;
-                                       if (liveslotcount > resizethreshold) {
-                                               resize = true; //Resize is forced
-                                       }
 
-                                       if (resize) {
-                                               newsize = (int) (numslots * RESIZE_MULTIPLE);
-                                               TableStatus status = new TableStatus(s, newsize);
-                                               s.addEntry(status);
-                                       }
+                       // if got here then all pending transactions were sent
+                       sentAllPending = true;
+               } catch (Exception e) {
+                       // There was a connection error
+                       e.printStackTrace();
+                       sentAllPending = false;
+               }
 
-                                       doRejectedMessages(s);
 
-                                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+               if (!sentAllPending) {
 
-                                       // Resize was needed so redo call
-                                       if (retTup.getFirst()) {
-                                               needResize = true;
-                                               continue;
-                                       }
+                       for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
+                               PendingTransaction pt = i.next();
+                               LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
+                               if (lc == null) {
+                                       // Cant talk directly to arbitrator so cant do anything
+                                       continue;
+                               }
 
-                                       // Extract working variables
-                                       boolean seenliveslot = retTup.getSecond();
-                                       long seqn = retTup.getThird();
 
-                                       // Did need to arbitrate
-                                       doEnd = !doArbitration(s);
+                               Transaction ut = new Transaction(null,
+                                                                -1,
+                                                                localmachineid,
+                                                                pendingTransBuild.getArbitrator(),
+                                                                pendingTransBuild.getKVUpdates(),
+                                                                pendingTransBuild.getKVGuard());
 
-                                       doOptionalRescue(s, seenliveslot, seqn, resize);
 
-                                       int max = 0;
-                                       if (resize) {
-                                               max = newsize;
-                                       }
+                               Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
+
+                               for (Commit commit : retData.getSecond()) {
+                                       // Prepare to process the commit
+                                       processEntry(commit);
+                               }
+
+                               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+                               // Go through all uncommitted transactions and kill the ones that are dead
+                               deleteDeadUncommittedTransactions();
 
-                                       Slot[] array = cloud.putSlot(s, max);
-                                       if (array == null) {
-                                               array = new Slot[] {s};
-                                               rejectedmessagelist.clear();
-                                       }       else {
-                                               if (array.length == 0)
-                                                       throw new Error("Server Error: Did not send any slots");
-                                               rejectedmessagelist.add(s.getSequenceNumber());
-                                               doEnd = false;
+                               // Speculate on key value pairs
+                               didCommitOrSpeculate |= createSpeculativeTable();
+                               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+
+                               if (retData.getFirst()) {
+                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusCommitted);
                                        }
 
-                                       /* update data structure */
-                                       validateandupdate(array, true);
+                               } else {
+                                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+                                       if (transStatus != null) {
+                                               transStatus.setStatus(TransactionStatus.StatusAborted);
+                                       }
                                }
                        }
                }
        }
 
-       public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
+       private void updateWithNotPendingTrans() throws ServerException {
 
-               while (true) {
-                       if (arbitratorTable.get(keyName) != null) {
-                               // There is already an arbitrator
-                               return false;
+               boolean doEnd = false;
+               boolean needResize = false;
+               while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0)  || (pendingCommitsList.size() > 0))   ) {
+                       boolean resize = needResize;
+                       needResize = false;
+
+                       Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+                       int newsize = 0;
+                       if (liveslotcount > resizethreshold) {
+                               resize = true; //Resize is forced
                        }
 
-                       if (tryput(keyName, machineId, false)) {
-                               // If successfully inserted
-                               return true;
+                       if (resize) {
+                               newsize = (int) (numslots * RESIZE_MULTIPLE);
+                               TableStatus status = new TableStatus(s, newsize);
+                               s.addEntry(status);
+                       }
+
+                       doRejectedMessages(s);
+
+                       ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+                       // Resize was needed so redo call
+                       if (retTup.getFirst()) {
+                               needResize = true;
+                               continue;
                        }
+
+                       // Extract working variables
+                       boolean seenliveslot = retTup.getSecond();
+                       long seqn = retTup.getThird();
+
+                       // Did need to arbitrate
+                       doEnd = !doArbitration(s);
+
+                       doOptionalRescue(s, seenliveslot, seqn, resize);
+
+                       int max = 0;
+                       if (resize) {
+                               max = newsize;
+                       }
+
+                       Slot[] array = cloud.putSlot(s, max);
+                       if (array == null) {
+                               array = new Slot[] {s};
+                               rejectedmessagelist.clear();
+
+                               // Delete pending commits that were sent to the cloud
+                               deletePendingCommits();
+
+                       }       else {
+                               if (array.length == 0)
+                                       throw new Error("Server Error: Did not send any slots");
+                               rejectedmessagelist.add(s.getSequenceNumber());
+                               doEnd = false;
+                       }
+
+                       /* update data structure */
+                       validateandupdate(array, true);
                }
        }
 
+       private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+
+               // encode the request
+               byte[] array = new byte[Long.BYTES + ut.getSize()];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+               if (lastSeenCommit != null) {
+                       bbEncode.putLong(lastSeenCommit);
+               } else {
+                       bbEncode.putLong(0);
+               }
+               ut.encode(bbEncode);
+
+               byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               boolean didCommit = bbDecode.get() == 1;
+               int numberOfCommites = bbDecode.getInt();
+
+               List<Commit> newCommits = new LinkedList<Commit>();
+               for (int i = 0; i < numberOfCommites; i++ ) {
+                       bbDecode.get();
+                       Commit com = (Commit)Commit.decode(null, bbDecode);
+                       newCommits.add(com);
+               }
+
+               return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+       }
+
+       public byte[] localCommInput(byte[] data) {
+
+               // Decode the data
+               ByteBuffer bbDecode = ByteBuffer.wrap(data);
+               long lastSeenCommit = bbDecode.getLong();
+               bbDecode.get();
+               Transaction ut = (Transaction)Transaction.decode(null, bbDecode);
+
+               // Do the local update and arbitrate
+               Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+
+               // Calculate the size of the response
+               int size = Byte.BYTES + Integer.BYTES;
+               for (Commit com : returnData.getSecond()) {
+                       size += com.getSize();
+               }
+
+               // encode the response
+               byte[] array = new byte[size];
+               ByteBuffer bbEncode = ByteBuffer.wrap(array);
+               if (returnData.getFirst()) {
+                       bbEncode.put((byte)1);
+               } else {
+                       bbEncode.put((byte)0);
+               }
+               bbEncode.putInt(returnData.getSecond().size());
+
+               for (Commit com : returnData.getSecond()) {
+                       com.encode(bbEncode);
+               }
+
+               return bbEncode.array();
+       }
+
+       private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+               if (ut.getArbitrator() != localmachineid) {
+                       // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+                       return null;
+               }
+
+               List<Commit> returnCommits = new ArrayList<Commit>();
+
+               if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+                       // There is a commit that the other client has not seen yet
+
+                       Map<Long, Commit> cm = commitMap.get(localmachineid);
+                       if (cm != null) {
+
+                               List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+                               Collections.sort(commitKeys);
+
+
+                               for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+                                       Commit com = cm.get(commitKeys.get(i));
+
+                                       if (com.getSequenceNumber() <= lastCommitSeen) {
+                                               break;
+                                       }
+                                       returnCommits.add((Commit)com.getCopy(null));
+                               }
+                       }
+               }
+
+               if (!ut.evaluateGuard(commitedTable, null)) {
+                       // Guard evaluated as false so return only the commits that the other device has not seen yet
+                       return new Pair<Boolean, List<Commit>>(false, returnCommits);
+               }
+
+               // create the commit
+               Commit commit = new Commit(null,
+                                          -1,
+                                          commitSequenceNumber,
+                                          ut.getArbitrator(),
+                                          ut.getkeyValueUpdateSet());
+               commitSequenceNumber = commitSequenceNumber + 1;
+
+               // Add to the pending commits list
+               pendingCommitsList.add(commit);
+
+               // Add this commit so we can send it back
+               returnCommits.add(commit);
+
+               // Prepare to process the commit
+               processEntry(commit);
+
+               boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+               // Go through all uncommitted transactions and kill the ones that are dead
+               deleteDeadUncommittedTransactions();
+
+               // Speculate on key value pairs
+               didCommitOrSpeculate |= createSpeculativeTable();
+               createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+               return new Pair<Boolean, List<Commit>>(true, returnCommits);
+       }
+
        public void decrementLiveCount() {
                liveslotcount--;
        }
@@ -467,7 +767,23 @@ final public class Table {
                }
 
                doOptionalRescue(s, seenliveslot, seqn, resize);
-               return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
+
+               if (sendRetData.getFirst()) {
+                       // update the status and change what the sequence number is for the
+                       TransactionStatus transStatus = transactionStatusMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+                       transStatus.setStatus(TransactionStatus.StatusSent);
+                       transStatus.setSentTransaction();
+                       transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
+               }
+
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
        }
 
        private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
@@ -506,7 +822,14 @@ final public class Table {
                }
 
                doOptionalRescue(s, seenliveslot, seqn, resize);
-               return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+               Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
+
+               if (sendRetData.getSecond().length != 0) {
+                       // insert into the local block chain
+                       validateandupdate(sendRetData.getSecond(), true);
+               }
+
+               return sendRetData.getFirst();
        }
 
        private void doRejectedMessages(Slot s) {
@@ -588,6 +911,24 @@ final public class Table {
        }
 
        private boolean doArbitration(Slot s) {
+
+               // flag whether we have finished all arbitration
+               boolean stillHasArbitration = false;
+
+               pendingCommitsToDelete.clear();
+
+               // First add queue commits
+               for (Commit commit : pendingCommitsList) {
+                       if (s.hasSpace(commit)) {
+                               s.addEntry(commit);
+                               pendingCommitsToDelete.add(commit);
+                       } else {
+                               // Ran out of space so move on but still not done
+                               stillHasArbitration = true;
+                               return stillHasArbitration;
+                       }
+               }
+
                // Arbitrate
                Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
                List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
@@ -595,7 +936,6 @@ final public class Table {
                // Sort from oldest to newest
                Collections.sort(transSeqNums);
 
-               boolean didNeedArbitration = false;
                for (Long transNum : transSeqNums) {
                        Transaction ut = uncommittedTransactionsMap.get(transNum);
 
@@ -605,7 +945,7 @@ final public class Table {
                        }
 
                        // we did have something to arbitrate on
-                       didNeedArbitration = true;
+                       stillHasArbitration = true;
 
                        Entry newEntry = null;
 
@@ -618,12 +958,20 @@ final public class Table {
                                }
 
                                // create the commit
-                               newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
+                               newEntry = new Commit(s,
+                                                     ut.getSequenceNumber(),
+                                                     commitSequenceNumber,
+                                                     ut.getArbitrator(),
+                                                     ut.getkeyValueUpdateSet());
+                               commitSequenceNumber = commitSequenceNumber + 1;
                        } else {
                                // Guard was false
 
                                // create the abort
-                               newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
+                               newEntry = new Abort(s,
+                                                    ut.getSequenceNumber(),
+                                                    ut.getMachineID(),
+                                                    ut.getArbitrator());
                        }
 
                        if ((newEntry != null) && s.hasSpace(newEntry)) {
@@ -633,7 +981,14 @@ final public class Table {
                        }
                }
 
-               return didNeedArbitration;
+               return stillHasArbitration;
+       }
+
+       private void deletePendingCommits() {
+               for (Commit com : pendingCommitsToDelete) {
+                       pendingCommitsList.remove(com);
+               }
+               pendingCommitsToDelete.clear();
        }
 
        private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
@@ -665,7 +1020,7 @@ final public class Table {
                }
        }
 
-       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
+       private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize)  throws ServerException {
                int max = 0;
                if (resize)
                        max = newsize;
@@ -674,6 +1029,9 @@ final public class Table {
                if (array == null) {
                        array = new Slot[] {s};
                        rejectedmessagelist.clear();
+
+                       // Delete pending commits that were sent to the cloud
+                       deletePendingCommits();
                }       else {
                        // if (array.length == 0)
                        // throw new Error("Server Error: Did not send any slots");
@@ -681,11 +1039,7 @@ final public class Table {
                        inserted = false;
                }
 
-               if (array.length != 0) {
-                       validateandupdate(array, true);
-               }
-
-               return inserted;
+               return new Pair<Boolean, Slot[]>(inserted, array);
        }
 
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
@@ -745,79 +1099,112 @@ final public class Table {
                createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
        }
 
-       public boolean proccessAllNewCommits() {
-
+       private boolean proccessAllNewCommits() {
                // Process only if there are commit
                if (newCommitMap.keySet().size() == 0) {
                        return false;
                }
+               boolean didProcessNewCommit = false;
 
-               List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
-
-               // Sort from oldest to newest commit
-               Collections.sort(commitSeqNums);
+               for (Long arb : newCommitMap.keySet()) {
 
-               boolean didProcessNewCommit = false;
+                       List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
 
-               // Go through each new commit one by one
-               for (Long entrySeqNum : commitSeqNums) {
-                       Commit entry = newCommitMap.get(entrySeqNum);
+                       // Sort from oldest to newest commit
+                       Collections.sort(commitSeqNums);
 
-                       long lastCommitSeenSeqNum = -1;
+                       // Go through each new commit one by one
+                       for (Long entrySeqNum : commitSeqNums) {
+                               Commit entry = newCommitMap.get(arb).get(entrySeqNum);
 
-                       if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
-                               lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
-                       }
+                               long lastCommitSeenSeqNum = -1;
+                               if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+                                       lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+                               }
 
-                       if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+                               if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
+                                       Map<Long, Commit> cm = commitMap.get(arb);
+                                       if (cm == null) {
+                                               cm = new HashMap<Long, Commit>();
+                                       }
 
-                               Commit prevCommit = commitMap.put(entry.getTransSequenceNumber(), entry);
+                                       Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
+                                       commitMap.put(arb, cm);
 
-                               if (prevCommit != null) {
-                                       prevCommit.setDead();
+                                       if (prevCommit != null) {
+                                               prevCommit.setDead();
 
-                                       for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
-                                               committedMapByKey.put(kv.getKey(), entry);
+                                               for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+                                                       committedMapByKey.put(kv.getKey(), entry);
+                                               }
                                        }
+
+                                       continue;
                                }
 
-                               continue;
-                       }
+                               Set<Commit> commitsToEditSet = new HashSet<Commit>();
 
-                       Set<Commit> commitsToEditSet = new HashSet<Commit>();
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
+                               }
 
-                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                               commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
-                       }
+                               commitsToEditSet.remove(null);
 
-                       commitsToEditSet.remove(null);
+                               for (Commit prevCommit : commitsToEditSet) {
 
-                       for (Commit prevCommit : commitsToEditSet) {
+                                       Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
 
-                               Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+                                       if (!prevCommit.isLive()) {
+                                               Map<Long, Commit> cm = commitMap.get(arb);
 
-                               if (!prevCommit.isLive()) {
-                                       commitMap.remove(prevCommit.getTransSequenceNumber());
+                                               // remove it from the map so that it can be set as dead
+                                               if (cm != null) {
+                                                       cm.remove(prevCommit.getSequenceNumber());
+                                                       commitMap.put(arb, cm);
+                                               }
+                                       }
                                }
-                       }
 
-                       // Add the new commit
-                       commitMap.put(entry.getTransSequenceNumber(), entry);
-                       lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
-                       didProcessNewCommit = true;
+                               // Add the new commit
+                               Map<Long, Commit> cm = commitMap.get(arb);
+                               if (cm == null) {
+                                       cm = new HashMap<Long, Commit>();
+                               }
+                               cm.put(entry.getSequenceNumber(), entry);
+                               commitMap.put(arb, cm);
+
+                               lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
+
+                               // set the trans sequence number if we are able to
+                               if (entry.getTransSequenceNumber() != -1) {
+                                       lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                               }
 
-                       // Update the committed table list
-                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                               IoTString key = kv.getKey();
-                               commitedTable.put(key, kv);
+                               didProcessNewCommit = true;
 
-                               committedMapByKey.put(key, entry);
+                               // Update the committed table list
+                               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                                       IoTString key = kv.getKey();
+                                       commitedTable.put(key, kv);
+                                       committedMapByKey.put(key, entry);
+                               }
                        }
                }
-
                // Clear the new commits storage so we can use it later
                newCommitMap.clear();
 
+               // go through all saved transactions and update the status of those that can be updated
+               for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
+                       Map.Entry<Long, TransactionStatus> entry = i.next();
+                       long seqnum = entry.getKey();
+                       TransactionStatus status = entry.getValue();
+
+                       if ( status.getSentTransaction() && (lastCommitSeenTransSeqNumMap.get(status.getArbitrator()) != null) && (seqnum <= lastCommitSeenTransSeqNumMap.get(status.getArbitrator()))) {
+                               status.setStatus(TransactionStatus.StatusCommitted);
+                               i.remove();
+                       }
+               }
+
                return didProcessNewCommit;
        }
 
@@ -827,8 +1214,11 @@ final public class Table {
                        Transaction prevtrans = i.next().getValue();
                        long transArb = prevtrans.getArbitrator();
 
-                       if ((lastCommitSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastCommitSeenSeqNumMap.get(transArb)) ||
-                               (lastAbortSeenSeqNumMap.get(transArb) != null) && (prevtrans.getSequenceNumber() <= lastAbortSeenSeqNumMap.get(transArb))) {
+                       Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
+                       Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
+
+                       if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
+                               ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
                                i.remove();
                                prevtrans.setDead();
                        }
@@ -942,7 +1332,6 @@ final public class Table {
        private void updateExpectedSize() {
                expectedsize++;
                if (expectedsize > currmaxsize) {
-                       System.out.println("Maxing Out: " + expectedsize + "   " + currmaxsize);
                        expectedsize = currmaxsize;
                }
        }
@@ -1019,7 +1408,7 @@ final public class Table {
        private void processEntry(Transaction entry) {
 
                long arb = entry.getArbitrator();
-               Long comLast = lastCommitSeenSeqNumMap.get(arb);
+               Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
                Long abLast = lastAbortSeenSeqNumMap.get(arb);
 
                Transaction prevTrans = null;
@@ -1039,7 +1428,6 @@ final public class Table {
        }
 
        private void processEntry(Abort entry) {
-
                if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
                        // Abort has not been seen yet so we need to keep track of it
 
@@ -1047,18 +1435,32 @@ final public class Table {
                        if (prevAbort != null) {
                                prevAbort.setDead(); // delete old version of the duplicate
                        }
+
+                       if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+                               lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+                       }
                } else {
                        // The machine already saw this so it is dead
                        entry.setDead();
                }
 
-               if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) &&   (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
-                       lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+               // Update the status of the transaction and remove it since we are done with this transaction
+               TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
+               if (status != null) {
+                       status.setStatus(TransactionStatus.StatusAborted);
                }
        }
 
-       private void processEntry(Commit entry, Slot s) {
-               Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+       private void processEntry(Commit entry) {
+               Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
+
+               if (arbMap == null) {
+                       arbMap = new HashMap<Long, Commit>();
+               }
+
+               Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
+               newCommitMap.put(entry.getTransArbitrator(), arbMap);
+
                if (prevCommit != null) {
                        prevCommit.setDead();
                }
@@ -1078,8 +1480,6 @@ final public class Table {
                if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
                        largestTableStatusSeen = newnumslots;
                }
-
-               // System.out.println("Table Stat: " + newnumslots + "   large: " + largestTableStatusSeen + "   small: " + smallestTableStatusSeen);
        }
 
        private void addWatchList(long machineid, RejectedMessage entry) {
@@ -1161,7 +1561,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeCommit:
-                               processEntry((Commit)entry, slot);
+                               processEntry((Commit)entry);
                                break;
 
                        case Entry.TypeAbort: