Commits working, Transactions Working, Arbitrations working, Still needs a lot of...
[iotcloud.git] / version2 / src / java / iotcloud / Table.java
index cf21f48d29d81324249559f08cd16a7d37f4fc1b..b0b4c1a7da1788e92154aab4d09071ded3178b38 100644 (file)
@@ -8,9 +8,12 @@ import java.util.Vector;
 import java.util.Random;
 import java.util.Queue;
 import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * IoTTable data structure.  Provides client inferface.
@@ -36,24 +39,30 @@ final public class Table {
        private TableStatus lastTableStatus;
        static final int FREE_SLOTS = 10; //number of slots that should be kept free
        static final int SKIP_THRESHOLD = 10;
-       private long liveslotcount = 0;
+       public long liveslotcount = 0;  // TODO:  MAKE PRIVATE
        private int chance;
        static final double RESIZE_MULTIPLE = 1.2;
        static final double RESIZE_THRESHOLD = 0.75;
        static final int REJECTED_THRESHOLD = 5;
-       private int resizethreshold;
+       public int resizethreshold;     // TODO:  MAKE PRIVATE
        private long lastliveslotseqn;  //smallest sequence number with a live entry
        private Random random = new Random();
+       private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
 
        private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
        private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
        private List<Commit> commitList = null; // List of all the most recent live commits
+       private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
+
        private Set<Abort> abortSet = null; // Set of the live aborts
-       private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+       public  Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV       TODO: Make Private
        private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
-       private List<Transaction> uncommittedTransactionsList = null; //
+       public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
        private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+       private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
        // private Set<Abort> arbitratorTable = null; // Table of arbitrators
+       private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+
 
 
        public Table(String baseurl, String password, long _localmachineid) {
@@ -65,13 +74,7 @@ final public class Table {
                cloud = new CloudComm(this, baseurl, password);
                lastliveslotseqn = 1;
 
-               pendingTransQueue = new LinkedList<PendingTransaction>();
-               commitList = new LinkedList<Commit>();
-               abortSet = new HashSet<Abort>();
-               commitedTable = new HashMap<IoTString, KeyValue>();
-               speculativeTable = new HashMap<IoTString, KeyValue>();
-               uncommittedTransactionsList = new LinkedList<Transaction>();
-               arbitratorTable = new HashMap<IoTString, Long>();
+               setupDataStructs();
        }
 
        public Table(CloudComm _cloud, long _localmachineid) {
@@ -82,13 +85,19 @@ final public class Table {
                sequencenumber = 0;
                cloud = _cloud;
 
+               setupDataStructs();
+       }
+
+       private void setupDataStructs() {
                pendingTransQueue = new LinkedList<PendingTransaction>();
                commitList = new LinkedList<Commit>();
                abortSet = new HashSet<Abort>();
                commitedTable = new HashMap<IoTString, KeyValue>();
                speculativeTable = new HashMap<IoTString, KeyValue>();
-               uncommittedTransactionsList = new LinkedList<Transaction>();
+               uncommittedTransactionsMap = new HashMap<Long, Transaction>();
                arbitratorTable = new HashMap<IoTString, Long>();
+               newKeyTable = new HashMap<IoTString, NewKey>();
+               newCommitMap = new HashMap<Long, Commit> ();
        }
 
        public void rebuild() {
@@ -96,7 +105,45 @@ final public class Table {
                validateandupdate(newslots, true);
        }
 
+       // TODO: delete method
+       public void printSlots() {
+               long o = buffer.getOldestSeqNum();
+               long n = buffer.getNewestSeqNum();
 
+               int[] types = new int[10];
+
+               int num = 0;
+
+               int livec = 0;
+               int deadc = 0;
+               for (long i = o; i < (n + 1); i++) {
+                       Slot s = buffer.getSlot(i);
+
+                       Vector<Entry> entries = s.getEntries();
+
+                       for (Entry e : entries) {
+                               if (e.isLive()) {
+                                       int type = e.getType();
+                                       types[type] = types[type] + 1;
+                                       num++;
+                                       livec++;
+                               } else {
+                                       deadc++;
+                               }
+                       }
+               }
+
+               for (int i = 0; i < 10; i++) {
+                       System.out.println(i + "    " + types[i]);
+               }
+               System.out.println("Live count:   " + livec);
+               System.out.println("Dead count:   " + deadc);
+               System.out.println("Old:   " + o);
+               System.out.println("New:   " + n);
+               System.out.println("Size:   " + buffer.size());
+               System.out.println("Commits Map:   " + commitedTable.size());
+               System.out.println("Commits List:   " + commitList.size());
+       }
 
        public IoTString getCommitted(IoTString key) {
                KeyValue kv = commitedTable.get(key);
@@ -116,7 +163,6 @@ final public class Table {
                }
        }
 
-
        public void initTable() {
                cloud.setSalt();//Set the salt
                Slot s = new Slot(this, 1, localmachineid);
@@ -133,8 +179,6 @@ final public class Table {
        }
 
        public String toString() {
-
-
                String retString = " Committed Table: \n";
                retString += "---------------------------\n";
                retString += commitedTable.toString();
@@ -148,11 +192,6 @@ final public class Table {
                return retString;
        }
 
-
-
-
-
-
        public void startTransaction() {
                // Create a new transaction, invalidates any old pending transactions.
                pendingTransBuild = new PendingTransaction();
@@ -177,14 +216,16 @@ final public class Table {
 
        public void addKV(IoTString key, IoTString value) {
 
+               if (arbitratorTable.get(key) == null) {
+                       throw new Error("Key not Found.");
+               }
+
                // Make sure new key value pair matches the current arbitrator
                if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
-                       // TODO: Maybe not throw and error
-                       throw new Error("Not all Key Values match");
+                       // TODO: Maybe not throw en error
+                       throw new Error("Not all Key Values Match.");
                }
 
-
-
                KeyValue kv = new KeyValue(key, value);
                pendingTransBuild.addKV(kv);
        }
@@ -194,25 +235,16 @@ final public class Table {
        }
 
        public void update() {
+
                Slot[] newslots = cloud.getSlots(sequencenumber + 1);
 
                validateandupdate(newslots, false);
 
-               if (uncommittedTransactionsList.size() > 0) {
-                       List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
-
-                       for (Transaction ut : uncommittedTransactionsList) {
-                               KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
-                               long arb = arbitratorTable.get(kv.getKey());
-
-                               if (arb == localmachineid) {
-                                       uncommittedTransArb.add(ut);
-                               }
-                       }
-
+               if (uncommittedTransactionsMap.keySet().size() > 0) {
 
+                       boolean doEnd = false;
                        boolean needResize = false;
-                       while (uncommittedTransArb.size() > 0) {
+                       while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
                                boolean resize = needResize;
                                needResize = false;
 
@@ -228,156 +260,30 @@ final public class Table {
                                        s.addEntry(status);
                                }
 
-                               if (! rejectedmessagelist.isEmpty()) {
-                                       /* TODO: We should avoid generating a rejected message entry if
-                                        * there is already a sufficient entry in the queue (e.g.,
-                                        * equalsto value of true and same sequence number).  */
-
-                                       long old_seqn = rejectedmessagelist.firstElement();
-                                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
-                                               long new_seqn = rejectedmessagelist.lastElement();
-                                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
-                                               s.addEntry(rm);
-                                       } else {
-                                               long prev_seqn = -1;
-                                               int i = 0;
-                                               /* Go through list of missing messages */
-                                               for (; i < rejectedmessagelist.size(); i++) {
-                                                       long curr_seqn = rejectedmessagelist.get(i);
-                                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                                       if (s_msg != null)
-                                                               break;
-                                                       prev_seqn = curr_seqn;
-                                               }
-                                               /* Generate rejected message entry for missing messages */
-                                               if (prev_seqn != -1) {
-                                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
-                                                       s.addEntry(rm);
-                                               }
-                                               /* Generate rejected message entries for present messages */
-                                               for (; i < rejectedmessagelist.size(); i++) {
-                                                       long curr_seqn = rejectedmessagelist.get(i);
-                                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                                       long machineid = s_msg.getMachineID();
-                                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
-                                                       s.addEntry(rm);
-                                               }
-                                       }
-                               }
-
-                               long newestseqnum = buffer.getNewestSeqNum();
-                               long oldestseqnum = buffer.getOldestSeqNum();
-                               if (lastliveslotseqn < oldestseqnum) {
-                                       lastliveslotseqn = oldestseqnum;
-                               }
+                               doRejectedMessages(s);
 
-                               long seqn = lastliveslotseqn;
-                               boolean seenliveslot = false;
-                               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-                               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
-
-                               boolean tryAgain = false;
-
-                               // Mandatory Rescue
-                               for (; seqn < threshold; seqn++) {
-                                       Slot prevslot = buffer.getSlot(seqn);
-                                       //Push slot number forward
-                                       if (! seenliveslot)
-                                               lastliveslotseqn = seqn;
-
-                                       if (! prevslot.isLive())
-                                               continue;
-                                       seenliveslot = true;
-                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                                       for (Entry liveentry : liveentries) {
-                                               if (s.hasSpace(liveentry)) {
-                                                       s.addEntry(liveentry);
-                                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
-                                                       if (!resize) {
-                                                               System.out.print("B"); //?
-                                                               tryAgain = true;
-                                                               needResize = true;
-                                                       }
-                                               }
-                                       }
-                               }
+                               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
-                               if (tryAgain) {
+                               // Resize was needed so redo call
+                               if (retTup.getFirst()) {
+                                       needResize = true;
                                        continue;
                                }
 
-                               // Arbitrate
-                               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-                               for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
-                                       Transaction ut = i.next();
-
-                                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-                                       // Check if this machine arbitrates for this transaction
-                                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-                                               continue;
-                                       }
+                               // Extract working variables
+                               boolean seenliveslot = retTup.getSecond();
+                               long seqn = retTup.getThird();
 
-                                       Entry newEntry = null;
+                               // Did need to arbitrate
+                               doEnd = !doArbitration(s);
 
-                                       try {
-                                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                                       // Guard evaluated as true
-
-                                                       // update the local tmp current key set
-                                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                                       }
-
-                                                       // create the commit
-                                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-                                               } else {
-                                                       // Guard was false
-
-                                                       // create the abort
-                                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-                                               }
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                       }
-
-                                       if ((newEntry != null) && s.hasSpace(newEntry)) {
-                                               s.addEntry(newEntry);
-                                               i.remove();
-
-                                       } else {
-                                               break;
-                                       }
-                               }
-
-                               /* now go through live entries from least to greatest sequence number until
-                                * either all live slots added, or the slot doesn't have enough room
-                                * for SKIP_THRESHOLD consecutive entries*/
-                               int skipcount = 0;
-                               search:
-                               for (; seqn <= newestseqnum; seqn++) {
-                                       Slot prevslot = buffer.getSlot(seqn);
-                                       //Push slot number forward
-                                       if (!seenliveslot)
-                                               lastliveslotseqn = seqn;
-
-                                       if (!prevslot.isLive())
-                                               continue;
-                                       seenliveslot = true;
-                                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                                       for (Entry liveentry : liveentries) {
-                                               if (s.hasSpace(liveentry))
-                                                       s.addEntry(liveentry);
-                                               else {
-                                                       skipcount++;
-                                                       if (skipcount > SKIP_THRESHOLD)
-                                                               break search;
-                                               }
-                                       }
-                               }
+                               doOptionalRescue(s, seenliveslot, seqn, resize);
 
                                int max = 0;
-                               if (resize)
+                               if (resize) {
                                        max = newsize;
+                               }
+
                                Slot[] array = cloud.putSlot(s, max);
                                if (array == null) {
                                        array = new Slot[] {s};
@@ -386,13 +292,12 @@ final public class Table {
                                        if (array.length == 0)
                                                throw new Error("Server Error: Did not send any slots");
                                        rejectedmessagelist.add(s.getSequenceNumber());
+                                       doEnd = false;
                                }
 
                                /* update data structure */
                                validateandupdate(array, true);
                        }
-
-
                }
        }
 
@@ -412,11 +317,11 @@ final public class Table {
                }
        }
 
-       void decrementLiveCount() {
+       public void decrementLiveCount() {
                liveslotcount--;
+               // System.out.println("Decrement Live Count");
        }
 
-
        private void setResizeThreshold() {
                int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
                resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
@@ -424,128 +329,39 @@ final public class Table {
 
        private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
                Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
                int newsize = 0;
                if (liveslotcount > resizethreshold) {
                        resize = true; //Resize is forced
+                       System.out.println("Live count resize: " + liveslotcount + "   " + resizethreshold);
+
                }
 
                if (resize) {
                        newsize = (int) (numslots * RESIZE_MULTIPLE);
-                       TableStatus status = new TableStatus(s, newsize);
-                       s.addEntry(status);
-               }
 
-               if (! rejectedmessagelist.isEmpty()) {
-                       /* TODO: We should avoid generating a rejected message entry if
-                        * there is already a sufficient entry in the queue (e.g.,
-                        * equalsto value of true and same sequence number).  */
+                       System.out.println("New Size:  " + newsize + "  old: " + buffer.oldestseqn); // TODO remove
 
-                       long old_seqn = rejectedmessagelist.firstElement();
-                       if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
-                               long new_seqn = rejectedmessagelist.lastElement();
-                               RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
-                               s.addEntry(rm);
-                       } else {
-                               long prev_seqn = -1;
-                               int i = 0;
-                               /* Go through list of missing messages */
-                               for (; i < rejectedmessagelist.size(); i++) {
-                                       long curr_seqn = rejectedmessagelist.get(i);
-                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                       if (s_msg != null)
-                                               break;
-                                       prev_seqn = curr_seqn;
-                               }
-                               /* Generate rejected message entry for missing messages */
-                               if (prev_seqn != -1) {
-                                       RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
-                                       s.addEntry(rm);
-                               }
-                               /* Generate rejected message entries for present messages */
-                               for (; i < rejectedmessagelist.size(); i++) {
-                                       long curr_seqn = rejectedmessagelist.get(i);
-                                       Slot s_msg = buffer.getSlot(curr_seqn);
-                                       long machineid = s_msg.getMachineID();
-                                       RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
-                                       s.addEntry(rm);
-                               }
-                       }
+                       TableStatus status = new TableStatus(s, newsize);
+                       s.addEntry(status);
                }
 
-               long newestseqnum = buffer.getNewestSeqNum();
-               long oldestseqnum = buffer.getOldestSeqNum();
-               if (lastliveslotseqn < oldestseqnum)
-                       lastliveslotseqn = oldestseqnum;
+               doRejectedMessages(s);
 
-               long seqn = lastliveslotseqn;
-               boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
 
 
-               // Mandatory Rescue
-               for (; seqn < threshold; seqn++) {
-                       Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (! seenliveslot)
-                               lastliveslotseqn = seqn;
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
 
-                       if (! prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for (Entry liveentry : liveentries) {
-                               if (s.hasSpace(liveentry)) {
-                                       s.addEntry(liveentry);
-                               } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
-                                       if (!resize) {
-                                               System.out.print("B"); //?
-                                               return tryput(pendingTrans, true);
-                                       }
-                               }
-                       }
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(pendingTrans, true);
                }
 
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
 
-               // Arbitrate
-               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-               for (Transaction ut : uncommittedTransactionsList) {
-
-                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-                       // Check if this machine arbitrates for this transaction
-                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-                               continue;
-                       }
-
-                       Entry newEntry = null;
-
-                       try {
-                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-                                       // Guard evaluated as true
-
-                                       // update the local tmp current key set
-                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-                                               speculativeTableTmp.put(kv.getKey(), kv);
-                                       }
-
-                                       // create the commit
-                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-                               } else {
-                                       // Guard was false
-
-                                       // create the abort
-                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-                               }
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
-
-                       if ((newEntry != null) && s.hasSpace(newEntry)) {
-                               s.addEntry(newEntry);
-                       } else {
-                               break;
-                       }
-               }
+               doArbitration(s);
 
                Transaction trans = new Transaction(s,
                                                    s.getSequenceNumber(),
@@ -558,49 +374,13 @@ final public class Table {
                        insertedTrans = true;
                }
 
-               /* now go through live entries from least to greatest sequence number until
-                * either all live slots added, or the slot doesn't have enough room
-                * for SKIP_THRESHOLD consecutive entries*/
-               int skipcount = 0;
-               search:
-               for (; seqn <= newestseqnum; seqn++) {
-                       Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
-                       if (!seenliveslot)
-                               lastliveslotseqn = seqn;
-
-                       if (!prevslot.isLive())
-                               continue;
-                       seenliveslot = true;
-                       Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
-                       for (Entry liveentry : liveentries) {
-                               if (s.hasSpace(liveentry))
-                                       s.addEntry(liveentry);
-                               else {
-                                       skipcount++;
-                                       if (skipcount > SKIP_THRESHOLD)
-                                               break search;
-                               }
-                       }
-               }
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
 
-               int max = 0;
-               if (resize)
-                       max = newsize;
-               Slot[] array = cloud.putSlot(s, max);
-               if (array == null) {
-                       array = new Slot[] {s};
-                       rejectedmessagelist.clear();
-               }       else {
-                       if (array.length == 0)
-                               throw new Error("Server Error: Did not send any slots");
-                       rejectedmessagelist.add(s.getSequenceNumber());
-                       insertedTrans = false;
+               if (insertedTrans) {
+                       // System.out.println("Inserted: " + trans.getSequenceNumber());
                }
 
-               /* update data structure */
-               validateandupdate(array, true);
-
                return insertedTrans;
        }
 
@@ -617,6 +397,34 @@ final public class Table {
                        s.addEntry(status);
                }
 
+               doRejectedMessages(s);
+               ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+               // Resize was needed so redo call
+               if (retTup.getFirst()) {
+                       return tryput(keyName, arbMachineid, true);
+               }
+
+               // Extract working variables
+               boolean seenliveslot = retTup.getSecond();
+               long seqn = retTup.getThird();
+
+
+               doArbitration(s);
+
+               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+               boolean insertedNewKey = false;
+               if (s.hasSpace(newKey)) {
+                       s.addEntry(newKey);
+                       insertedNewKey = true;
+               }
+
+               doOptionalRescue(s, seenliveslot, seqn, resize);
+               return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+       }
+
+       private void doRejectedMessages(Slot s) {
                if (! rejectedmessagelist.isEmpty()) {
                        /* TODO: We should avoid generating a rejected message entry if
                         * there is already a sufficient entry in the queue (e.g.,
@@ -653,7 +461,9 @@ final public class Table {
                                }
                        }
                }
+       }
 
+       private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
                long newestseqnum = buffer.getNewestSeqNum();
                long oldestseqnum = buffer.getOldestSeqNum();
                if (lastliveslotseqn < oldestseqnum)
@@ -661,14 +471,14 @@ final public class Table {
 
                long seqn = lastliveslotseqn;
                boolean seenliveslot = false;
-               long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
-               long threshold = firstiffull + FREE_SLOTS;      //we want the buffer to be clear of live entries up to this point
+               long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
+               long threshold = firstiffull + FREE_SLOTS;      // we want the buffer to be clear of live entries up to this point
 
 
                // Mandatory Rescue
                for (; seqn < threshold; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
-                       //Push slot number forward
+                       // Push slot number forward
                        if (! seenliveslot)
                                lastliveslotseqn = seqn;
 
@@ -681,70 +491,81 @@ final public class Table {
                                        s.addEntry(liveentry);
                                } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
                                        if (!resize) {
-                                               System.out.print("B"); //?
-                                               return tryput(keyName, arbMachineid, true);
+                                               System.out.println("B"); //?
+
+                                               System.out.println("==============================NEEEEDDDD RESIZING");
+                                               return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
                                        }
                                }
                        }
                }
 
+               // Did not resize
+               return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
+       }
 
-               // // Arbitrate
-               // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-               // for (Transaction ut : uncommittedTransactionsList) {
+       private boolean doArbitration(Slot s) {
+               // Arbitrate
+               Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
 
-               //      KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
-               //      // Check if this machine arbitrates for this transaction
-               //      if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
-               //              continue;
-               //      }
+               List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
 
-               //      Entry newEntry = null;
+               // Sort from oldest to newest
+               Collections.sort(transSeqNums);
 
-               //      try {
-               //              if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
-               //                      // Guard evaluated as true
 
-               //                      // update the local tmp current key set
-               //                      for (KeyValue kv : ut.getkeyValueUpdateSet()) {
-               //                              speculativeTableTmp.put(kv.getKey(), kv);
-               //                      }
+               boolean didNeedArbitration = false;
+               for (Long transNum : transSeqNums) {
+                       Transaction ut = uncommittedTransactionsMap.get(transNum);
 
-               //                      // create the commit
-               //                      newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
-               //              } else {
-               //                      // Guard was false
+                       KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+                       // Check if this machine arbitrates for this transaction
+                       if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+                               continue;
+                       }
 
-               //                      // create the abort
-               //                      newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
-               //              }
-               //      } catch (Exception e) {
-               //              e.printStackTrace();
-               //      }
+                       // we did have something to arbitrate on
+                       didNeedArbitration = true;
 
-               //      if ((newEntry != null) && s.hasSpace(newEntry)) {
+                       Entry newEntry = null;
 
-               //              // TODO: Remove print
-               //              System.out.println("Arbitrating...");
-               //              s.addEntry(newEntry);
-               //      } else {
-               //              break;
-               //      }
-               // }
+                       try {
+                               if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+                                       // Guard evaluated as true
 
+                                       // update the local tmp current key set
+                                       for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+                                               speculativeTableTmp.put(kv.getKey(), kv);
+                                       }
 
-               NewKey newKey = new NewKey(s, keyName, arbMachineid);
+                                       // create the commit
+                                       newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+                               } else {
+                                       // Guard was false
 
-               boolean insertedNewKey = false;
-               if (s.hasSpace(newKey)) {
-                       s.addEntry(newKey);
-                       insertedNewKey = true;
+                                       // create the abort
+                                       newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+
+                       if ((newEntry != null) && s.hasSpace(newEntry)) {
+                               s.addEntry(newEntry);
+                       } else {
+                               break;
+                       }
                }
 
+               return didNeedArbitration;
+       }
+
+       private void  doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
                /* now go through live entries from least to greatest sequence number until
                 * either all live slots added, or the slot doesn't have enough room
                 * for SKIP_THRESHOLD consecutive entries*/
                int skipcount = 0;
+               long newestseqnum = buffer.getNewestSeqNum();
                search:
                for (; seqn <= newestseqnum; seqn++) {
                        Slot prevslot = buffer.getSlot(seqn);
@@ -766,7 +587,9 @@ final public class Table {
                                }
                        }
                }
+       }
 
+       private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
                int max = 0;
                if (resize)
                        max = newsize;
@@ -778,13 +601,13 @@ final public class Table {
                        if (array.length == 0)
                                throw new Error("Server Error: Did not send any slots");
                        rejectedmessagelist.add(s.getSequenceNumber());
-                       insertedNewKey = false;
+                       inserted = false;
                }
 
                /* update data structure */
                validateandupdate(array, true);
 
-               return insertedNewKey;
+               return inserted;
        }
 
        private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
@@ -808,6 +631,8 @@ final public class Table {
                        updateExpectedSize();
                }
 
+               proccessAllNewCommits();
+
                /* If there is a gap, check to see if the server sent us everything. */
                if (firstseqnum != (sequencenumber + 1)) {
 
@@ -831,10 +656,87 @@ final public class Table {
                createSpeculativeTable();
        }
 
+       public void proccessAllNewCommits() {
+
+               // Process only if there are commit
+               if (newCommitMap.keySet().size() == 0) {
+                       return;
+               }
+
+               List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
+
+               // Sort from oldest to newest commit
+               Collections.sort(commitSeqNums);
+
+               // Go through each new commit one by one
+               for (Long entrySeqNum : commitSeqNums) {
+                       Commit entry = newCommitMap.get(entrySeqNum);
+
+                       if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+
+                               // Remove any old commits
+                               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                                       Commit prevcommit = i.next();
+
+                                       if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
+                                               prevcommit.setDead();
+                                               i.remove();
+                                       }
+                               }
+                               commitList.add(entry);
+                               continue;
+                       }
+
+                       // Remove any old commits
+                       for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+                               Commit prevcommit = i.next();
+                               prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+                               if (!prevcommit.isLive()) {
+                                       i.remove();
+                               }
+                       }
+
+                       // Add the new commit
+                       commitList.add(entry);
+                       lastCommitSeenSeqNum = entry.getTransSequenceNumber();
+                       // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
+
+
+                       // Update the committed table list
+                       for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+                               IoTString key = kv.getKey();
+                               commitedTable.put(key, kv);
+                       }
+
+                       long committedTransSeq = entry.getTransSequenceNumber();
+
+                       // Make dead the transactions
+                       for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                               Transaction prevtrans = i.next().getValue();
+
+                               if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+                                       i.remove();
+                                       prevtrans.setDead();
+                               }
+                       }
+               }
+
+
+               // Clear the new commits storage so we can use it later
+               newCommitMap.clear();
+       }
+
        private void createSpeculativeTable() {
                Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+               List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+               // Sort from oldest to newest commit
+               Collections.sort(utSeqNums);
 
-               for (Transaction trans : uncommittedTransactionsList) {
+
+               for (Long key : utSeqNums) {
+                       Transaction trans = uncommittedTransactionsMap.get(key);
 
                        try {
                                if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
@@ -877,17 +779,15 @@ final public class Table {
        }
 
        private void commitNewMaxSize() {
-               if (numslots != currmaxsize)
+               if (numslots != currmaxsize) {
+                       System.out.println("Resizing the buffer"); // TODO: Remove
                        buffer.resize(currmaxsize);
+               }
 
                numslots = currmaxsize;
                setResizeThreshold();
        }
 
-
-
-
-
        private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
                updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
        }
@@ -929,15 +829,25 @@ final public class Table {
 
        private void processEntry(NewKey entry) {
                arbitratorTable.put(entry.getKey(), entry.getMachineID());
+
+               NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
+
+               if (oldNewKey != null) {
+                       oldNewKey.setDead();
+               }
        }
 
        private void processEntry(Transaction entry) {
-               uncommittedTransactionsList.add(entry);
+               Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+               // Duplicate so delete old copy
+               if (prevTrans != null) {
+                       prevTrans.setDead();
+               }
        }
 
        private void processEntry(Abort entry) {
 
-
                if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
                        // Abort has not been seen yet so we need to keep track of it
                        abortSet.add(entry);
@@ -946,50 +856,24 @@ final public class Table {
                        entry.setDead();
                }
 
-               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
-                       Transaction prevtrans = i.next();
-                       if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
-                               uncommittedTransactionsList.remove(prevtrans);
-                               prevtrans.setDead();
-                               return;
-                       }
-               }
-       }
-
-       private void processEntry(Commit entry) {
-
-               for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
-                       Commit prevcommit = i.next();
-                       prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
-
-                       if (!prevcommit.isLive()) {
-                               //commitList.remove(prevcommit);
-                               i.remove();
-                       }
-               }
-
-               commitList.add(entry);
-
-               // Update the committed table list
-               for (KeyValue kv : entry.getkeyValueUpdateSet()) {
-                       IoTString key = kv.getKey();
-                       commitedTable.put(key, kv);
-               }
-
-               long committedTransSeq = entry.getTransSequenceNumber();
-
                // Make dead the transactions
-               for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
-                       Transaction prevtrans = i.next();
+               for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+                       Transaction prevtrans = i.next().getValue();
 
-                       if (prevtrans.getSequenceNumber() <= committedTransSeq) {
-                               // uncommittedTransactionsList.remove(prevtrans);
+                       if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
                                i.remove();
                                prevtrans.setDead();
                        }
                }
        }
 
+       private void processEntry(Commit entry, Slot s) {
+               Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+               if (prevCommit != null) {
+                       prevCommit.setDead();
+               }
+       }
+
        private void processEntry(TableStatus entry) {
                int newnumslots = entry.getMaxSlots();
                updateCurrMaxSize(newnumslots);
@@ -1079,7 +963,7 @@ final public class Table {
                                break;
 
                        case Entry.TypeCommit:
-                               processEntry((Commit)entry);
+                               processEntry((Commit)entry, slot);
                                break;
 
                        case Entry.TypeAbort: