edits
authorbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 07:50:45 +0000 (23:50 -0800)
committerbdemsky <bdemsky@uci.edu>
Sat, 20 Jan 2018 07:50:45 +0000 (23:50 -0800)
version2/src/C/PendingTransaction.cc
version2/src/C/Slot.cc
version2/src/C/Table.cc

index be7575f7e9aac70bf87ba2f0108408f411b1d0ee..b1f6db3b2a21b44d2a339d0a889cb0886f2cf60b 100644 (file)
@@ -116,7 +116,7 @@ Transaction *PendingTransaction::createTransaction() {
        Array<char> *charData = convertDataToBytes();
 
        int currentPosition = 0;
-       for(int remaining = charData->length(); remaining > 0;) {
+       for (int remaining = charData->length(); remaining > 0;) {
                bool isLastPart = false;
                // determine how much to copy
                int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
@@ -124,7 +124,7 @@ Transaction *PendingTransaction::createTransaction() {
                        copySize = remaining;
                        isLastPart = true;//last bit of data so last part
                }
-               
+
                // Copy to a smaller version
                Array<char> *partData = new Array<char>(copySize);
                System_arraycopy(charData, currentPosition, partData, 0, copySize);
index 3f752262b46ac7db2a31ca1feb9cf770cabebb0d..18d1a5dc0964c0b2de1829e8744c45fb9ac25895 100644 (file)
@@ -112,8 +112,8 @@ Array<char> *Slot::encode(Mac *mac) {
        bb->putLong(seqnum);
        bb->putLong(machineid);
        bb->putInt(entries->size());
-       for(uint ei=0; ei < entries->size(); ei++) {
-               Entry * entry = entries->get(ei);
+       for (uint ei = 0; ei < entries->size(); ei++) {
+               Entry *entry = entries->get(ei);
                entry->encode(bb);
        }
        /* Compute our HMAC */
@@ -134,8 +134,8 @@ Array<char> *Slot::encode(Mac *mac) {
 
 Vector<Entry *> *Slot::getLiveEntries(bool resize) {
        Vector<Entry *> *liveEntries = new Vector<Entry *>();
-       for(uint ei=0; ei < entries->size(); ei++) {
-               Entry * entry = entries->get(ei);
+       for (uint ei = 0; ei < entries->size(); ei++) {
+               Entry *entry = entries->get(ei);
                if (entry->isLive()) {
                        if (!resize || entry->getType() != TypeTableStatus)
                                liveEntries->add(entry);
index 59c049a6c4ceb0dd8d741bd3889d91a2df488259..2c8b8e87bc1d862ee9940e692f541b0663c8ae8e 100644 (file)
@@ -1331,19 +1331,21 @@ search:
  */
 void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
 
-       // The cloud communication layer has checked slot HMACs already before decoding
+       // The cloud communication layer has checked slot HMACs already
+       // before decoding
        if (newSlots->length() == 0) {
                return;
        }
 
-       // Make sure all slots are newer than the last largest slot this client has seen
-       int64_t firstSeqNum = newSlots[0]->getSequenceNumber();
+       // Make sure all slots are newer than the last largest slot this
+       // client has seen
+       int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
        if (firstSeqNum <= sequenceNumber) {
                throw new Error("Server Error: Sent older slots!");
        }
 
-       // Create an object that can access both new slots and slots in our local chain
-       // without committing slots to our local chain
+       // Create an object that can access both new slots and slots in our
+       // local chain without committing slots to our local chain
        SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
 
        // Check that the HMAC chain is not broken
@@ -1359,15 +1361,17 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                updateExpectedSize();
        }
 
-       // If there is a gap, check to see if the server sent us everything->
+       // If there is a gap, check to see if the server sent us
+       // everything->
        if (firstSeqNum != (sequenceNumber + 1)) {
 
                // Check the size of the slots that were sent down by the server->
                // Can only check the size if there was a gap
                checkNumSlots(newSlots->length);
 
-               // Since there was a gap every machine must have pushed a slot or must have
-               // a last message message->  If not then the server is hiding slots
+               // Since there was a gap every machine must have pushed a slot or
+               // must have a last message message-> If not then the server is
+               // hiding slots
                if (!machineSet->isEmpty()) {
                        throw new Error("Missing record for machines: " + machineSet);
                }
@@ -1382,13 +1386,13 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // Insert this slot into our local block chain copy->
                buffer->putSlot(slot);
 
-               // Keep track of how many slots are currently live (have live data in them)->
+               // Keep track of how many slots are currently live (have live data
+               // in them)->
                liveSlotCount++;
        }
 
        // Get the sequence number of the latest slot in the system
-       sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber();
-
+       sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
        updateLiveStateFromServer();
 
        // No Need to remember after we pulled from the server
@@ -1429,23 +1433,13 @@ void Table::updateLiveStateFromLocal() {
 }
 
 void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
-       // if (didFindTableStatus) {
-       // return;
-       // }
        int64_t prevslots = firstSequenceNumber;
 
-
        if (didFindTableStatus) {
-               // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize;
-               // System->out->println("Here2: " + expectedsize + "    " + numberOfSlots + "   " + prevslots);
-
        } else {
                expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
-               // System->out->println("Here: " + expectedsize);
        }
 
-       // System->out->println(numberOfSlots);
-
        didFindTableStatus = true;
        currMaxSize = numberOfSlots;
 }
@@ -1460,10 +1454,11 @@ void Table::updateExpectedSize() {
 
 
 /**
- * Check the size of the block chain to make sure there are enough slots sent back by the server->
- * This is only called when we have a gap between the slots that we have locally and the slots
- * sent by the server therefore in the slots sent by the server there will be at least 1 Table
- * status message
+ * Check the size of the block chain to make sure there are enough
+ * slots sent back by the server-> This is only called when we have a
+ * gap between the slots that we have locally and the slots sent by
+ * the server therefore in the slots sent by the server there will be
+ * at least 1 Table status message
  */
 void Table::checkNumSlots(int numberOfSlots) {
        if (numberOfSlots != expectedsize) {
@@ -1490,13 +1485,14 @@ void Table::commitNewMaxSize() {
        // Change the number of local slots to the new size
        numberOfSlots = (int32_t)currMaxSize;
 
-
-       // Recalculate the resize threshold since the size of the local buffer has changed
+       // Recalculate the resize threshold since the size of the local
+       // buffer has changed
        setResizeThreshold();
 }
 
 /**
- * Process the new transaction parts from this latest round of slots received from the server
+ * Process the new transaction parts from this latest round of slots
+ * received from the server
  */
 void Table::processNewTransactionParts() {
 
@@ -1505,7 +1501,8 @@ void Table::processNewTransactionParts() {
                return;
        }
 
-       // Iterate through all the machine Ids that we received new parts for
+       // Iterate through all the machine Ids that we received new parts
+       // for
        for (int64_t machineId : newTransactionParts->keySet()) {
                Hashtable<Pair<int64_t int32_t>, TransactionPart *> *parts = newTransactionParts->get(machineId);
 
@@ -1537,7 +1534,8 @@ void Table::processNewTransactionParts() {
                }
        }
 
-       // Clear all the new transaction parts in preparation for the next time the server sends slots
+       // Clear all the new transaction parts in preparation for the next
+       // time the server sends slots
        newTransactionParts->clear();
 }
 
@@ -1564,7 +1562,8 @@ void Table::arbitrateFromServer() {
 
 
 
-               // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
+               // Check if this machine arbitrates for this transaction if not
+               // then we cant arbitrate this transaction
                if (transaction->getArbitrator() != localMachineId) {
                        continue;
                }
@@ -1608,7 +1607,6 @@ void Table::arbitrateFromServer() {
                        lastTransactionCommitted = transactionSequenceNumber;
                } else {
                        // Guard evaluated was false so create abort
-
                        // Create the abort
                        Abort *newAbort = new Abort(NULL,
                                                                                                                                        transaction->getClientLocalSequenceNumber(),
@@ -1617,7 +1615,6 @@ void Table::arbitrateFromServer() {
                                                                                                                                        transaction->getArbitrator(),
                                                                                                                                        localArbitrationSequenceNumber);
                        localArbitrationSequenceNumber++;
-
                        generatedAborts->add(newAbort);
 
                        // Insert the abort so we can process
@@ -1625,15 +1622,12 @@ void Table::arbitrateFromServer() {
                }
 
                lastSeqNumArbOn = transactionSequenceNumber;
-
-               // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber);
        }
 
        Commit *newCommit = NULL;
 
        // If there is something to commit
        if (speculativeTableTmp->size() != 0) {
-
                // Create the commit and increment the commit sequence number
                newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
                localArbitrationSequenceNumber++;
@@ -1646,8 +1640,8 @@ void Table::arbitrateFromServer() {
                // create the commit parts
                newCommit->createCommitParts();
 
-               // Append all the commit parts to the end of the pending queue waiting for sending to the server
-
+               // Append all the commit parts to the end of the pending queue
+               // waiting for sending to the server
                // Insert the commit so we can process it
                for (CommitPart *commitPart : newCommit->getParts()->values()) {
                        processEntry(commitPart);
@@ -1671,7 +1665,8 @@ void Table::arbitrateFromServer() {
 
 Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 
-       // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
+       // Check if this machine arbitrates for this transaction if not then
+       // we cant arbitrate this transaction
        if (transaction->getArbitrator() != localMachineId) {
                return Pair<bool, bool>(false, false);
        }
@@ -1693,9 +1688,8 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
        }
 
        if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
-               // Guard evaluated as true
-
-               // Create the commit and increment the commit sequence number
+               // Guard evaluated as true Create the commit and increment the
+               // commit sequence number
                Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
                localArbitrationSequenceNumber++;
 
@@ -1707,7 +1701,8 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                // create the commit parts
                newCommit->createCommitParts();
 
-               // Append all the commit parts to the end of the pending queue waiting for sending to the server
+               // Append all the commit parts to the end of the pending queue
+               // waiting for sending to the server
                ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
                pendingSendArbitrationRounds->add(arbitrationRound);
 
@@ -1733,10 +1728,8 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                updateLiveStateFromLocal();
                return Pair<bool, bool>(true, true);
        } else {
-
                if (transaction->getMachineId() == localMachineId) {
                        // For locally created messages update the status
-
                        // Guard evaluated was false so create abort
                        TransactionStatus status = transaction->getTransactionStatus();
                        if (status != NULL) {
@@ -1745,7 +1738,6 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                } else {
                        Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
 
-
                        // Create the abort
                        Abort *newAbort = new Abort(NULL,
                                                                                                                                        transaction->getClientLocalSequenceNumber(),
@@ -1754,11 +1746,10 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                                                                                                                                        transaction->getArbitrator(),
                                                                                                                                        localArbitrationSequenceNumber);
                        localArbitrationSequenceNumber++;
-
                        addAbortSet->add(newAbort);
 
-
-                       // Append all the commit parts to the end of the pending queue waiting for sending to the server
+                       // Append all the commit parts to the end of the pending queue
+                       // waiting for sending to the server
                        ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
                        pendingSendArbitrationRounds->add(arbitrationRound);
 
@@ -1776,10 +1767,11 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 }
 
 /**
- * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+ * Compacts the arbitration data my merging commits and aggregating
+ * aborts so that a single large push of commits can be done instead
+ * of many small updates
  */
 bool Table::compactArbitrationData() {
-
        if (pendingSendArbitrationRounds->size() < 2) {
                // Nothing to compact so do nothing
                return false;
@@ -1798,12 +1790,12 @@ bool Table::compactArbitrationData() {
                ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
                if (round->isFull() || round->didSendPart()) {
-                       // Stop since there is a part that cannot be compacted and we need to compact in order
+                       // Stop since there is a part that cannot be compacted and we
+                       // need to compact in order
                        break;
                }
 
                if (round->getCommit() == NULL) {
-
                        // Try compacting aborts only
                        int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
                        if (newSize > ArbitrationRound->MAX_PARTS) {
@@ -1812,7 +1804,6 @@ bool Table::compactArbitrationData() {
                        }
                        lastRound->addAborts(round->getAborts());
                } else {
-
                        // Create a new larger commit
                        Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
                        localArbitrationSequenceNumber++;
@@ -1841,7 +1832,6 @@ bool Table::compactArbitrationData() {
 
        if (numberToDelete != 1) {
                // If there is a compaction
-
                // Delete the previous pieces that are now in the new compacted piece
                if (numberToDelete == pendingSendArbitrationRounds->size()) {
                        pendingSendArbitrationRounds->clear();
@@ -1862,12 +1852,10 @@ bool Table::compactArbitrationData() {
 
        return false;
 }
-// bool compactArbitrationData() {
-//  return false;
-// }
 
 /**
- * Update all the commits and the committed tables, sets dead the dead transactions
+ * Update all the commits and the committed tables, sets dead the dead
+ * transactions
  */
 bool Table::updateCommittedTable() {
 
@@ -1908,7 +1896,8 @@ bool Table::updateCommittedTable() {
                }
        }
 
-       // Clear all the new commits parts in preparation for the next time the server sends slots
+       // Clear all the new commits parts in preparation for the next time
+       // the server sends slots
        newCommitParts->clear();
 
        // If we process a new commit keep track of it for future use
@@ -1938,10 +1927,13 @@ bool Table::updateCommittedTable() {
                        // Special processing if a commit is not complete
                        if (!commit->isComplete()) {
                                if (i == (commitSequenceNumbers->size() - 1)) {
-                                       // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits
+                                       // If there is an incomplete commit and this commit is the
+                                       // latest one seen then this commit cannot be processed and
+                                       // there are no other commits
                                        break;
                                } else {
-                                       // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)->
+                                       // This is a commit that was already dead but parts of it
+                                       // are still in the block chain (not flushed out yet)->
                                        // Delete it and move on
                                        commit->setDead();
                                        commitForClientTable->remove(commit->getSequenceNumber());
@@ -1972,7 +1964,8 @@ bool Table::updateCommittedTable() {
                                lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
                        }
 
-                       // We have already seen this commit before so need to do the full processing on this commit
+                       // We have already seen this commit before so need to do the
+                       // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
 
                                // Update the last transaction that was updated if we can
@@ -1988,9 +1981,10 @@ bool Table::updateCommittedTable() {
                                continue;
                        }
 
-                       // If we got here then this is a brand new commit and needs full processing
-
-                       // Get what commits should be edited, these are the commits that have live values for their keys
+                       // If we got here then this is a brand new commit and needs full
+                       // processing
+                       // Get what commits should be edited, these are the commits that
+                       // have live values for their keys
                        Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
                        for (KeyValue *kv : commit->getKeyValueUpdateSet()) {
                                commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey()));
@@ -2039,7 +2033,8 @@ bool Table::updateCommittedTable() {
 }
 
 /**
- * Create the speculative table from transactions that are still live and have come from the cloud
+ * Create the speculative table from transactions that are still live
+ * and have come from the cloud
  */
 bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
        if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) {
@@ -2047,7 +2042,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                return false;
        }
 
-       // Create a list of the transaction sequence numbers and sort them from oldest to newest
+       // Create a list of the transaction sequence numbers and sort them
+       // from oldest to newest
        Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>(liveTransactionBySequenceNumberTable->keySet());
        Collections->sort(transactionSequenceNumbersSorted);
 
@@ -2055,8 +2051,10 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
 
 
        if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
-               // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction
-               // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch
+               // If there is a gap in the transaction sequence numbers then
+               // there was a commit or an abort of a transaction OR there was a
+               // new commit (Could be from offline commit) so a redo the
+               // speculation from scratch
 
                // Start from scratch
                speculatedKeyValueTable->clear();
@@ -2084,8 +2082,9 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
 
                if (!transaction->isComplete()) {
-                       // If there is an incomplete transaction then there is nothing we can do
-                       // add this transactions arbitrator to the list of arbitrators we should ignore
+                       // If there is an incomplete transaction then there is nothing
+                       // we can do add this transactions arbitrator to the list of
+                       // arbitrators we should ignore
                        incompleteTransactionArbitrator->add(transaction->getArbitrator());
                        didSkip = true;
                        continue;
@@ -2116,7 +2115,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
 }
 
 /**
- * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
+ * Create the pending transaction speculative table from transactions
+ * that are still in the pending transaction buffer
  */
 void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
        if (pendingTransactionQueue->size() == 0) {
@@ -2154,7 +2154,8 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr
 }
 
 /**
- * Set dead and remove from the live transaction tables the transactions that are dead
+ * Set dead and remove from the live transaction tables the
+ * transactions that are dead
  */
 void Table::updateLiveTransactionsAndStatus() {
 
@@ -2203,35 +2204,27 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo
        // Process each entry in the slot
        for (Entry *entry : slot->getEntries()) {
                switch (entry->getType()) {
-
                case TypeCommitPart:
                        processEntry((CommitPart *)entry);
                        break;
-
                case TypeAbort:
                        processEntry((Abort *)entry);
                        break;
-
                case TypeTransactionPart:
                        processEntry((TransactionPart *)entry);
                        break;
-
                case TypeNewKey:
                        processEntry((NewKey *)entry);
                        break;
-
                case TypeLastMessage:
                        processEntry((LastMessage *)entry, machineSet);
                        break;
-
                case TypeRejectedMessage:
                        processEntry((RejectedMessage *)entry, indexer);
                        break;
-
                case TypeTableStatus:
                        processEntry((TableStatus *)entry, slot->getSequenceNumber());
                        break;
-
                default:
                        throw new Error("Unrecognized type: " + entry->getType());
                }
@@ -2247,10 +2240,10 @@ void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
 }
 
 /**
- * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
+ * Add the new key to the arbitrators table and update the set of live
+ * new keys (in case of a rescued new key message)
  */
 void Table::processEntry(NewKey *entry) {
-
        // Update the arbitrator table with the new key information
        arbitratorTable->put(entry->getKey(), entry->getMachineID());
 
@@ -2263,18 +2256,19 @@ void Table::processEntry(NewKey *entry) {
 }
 
 /**
- * Process new table status entries and set dead the old ones as new ones come in->
- * keeps track of the largest and smallest table status seen in this current round
- * of updating the local copy of the block chain
+ * Process new table status entries and set dead the old ones as new
+ * ones come in-> keeps track of the largest and smallest table status
+ * seen in this current round of updating the local copy of the block
+ * chain
  */
 void Table::processEntry(TableStatus entry, int64_t seq) {
        int newNumSlots = entry->getMaxSlots();
        updateCurrMaxSize(newNumSlots);
-
        initExpectedSize(seq, newNumSlots);
 
        if (liveTableStatus != NULL) {
-               // We have a larger table status so the old table status is no int64_ter alive
+               // We have a larger table status so the old table status is no
+               // int64_ter alive
                liveTableStatus->setDead();
        }
 
@@ -2283,7 +2277,8 @@ void Table::processEntry(TableStatus entry, int64_t seq) {
 }
 
 /**
- * Check old messages to see if there is a block chain violation-> Also
+ * Check old messages to see if there is a block chain violation->
+ * Also
  */
 void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        int64_t oldSeqNum = entry->getOldSeqNum();
@@ -2292,16 +2287,15 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        int64_t machineId = entry->getMachineID();
        int64_t seq = entry->getSequenceNumber();
 
-
-       // Check if we have messages that were supposed to be rejected in our local block chain
+       // Check if we have messages that were supposed to be rejected in
+       // our local block chain
        for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
-
                // Get the slot
                Slot *slot = indexer->getSlot(seqNum);
 
                if (slot != NULL) {
-                       // If we have this slot make sure that it was not supposed to be a rejected slot
-
+                       // If we have this slot make sure that it was not supposed to be
+                       // a rejected slot
                        int64_t slotMachineId = slot->getMachineID();
                        if (isequal != (slotMachineId == machineId)) {
                                throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
@@ -2309,11 +2303,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
                }
        }
 
-
-       // Create a list of clients to watch until they see this rejected message entry->
+       // Create a list of clients to watch until they see this rejected
+       // message entry->
        Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
        for (Map->Entry<int64_t, Pair<int64_t, Liveness *> > *lastMessageEntry : lastMessageTable->entrySet()) {
-
                // Machine ID for the last message entry
                int64_t lastMessageEntryMachineId = lastMessageEntry->getKey();
 
@@ -2327,15 +2320,14 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
                int64_t entrySequenceNumber = lastMessageValue->getFirst();
 
                if (entrySequenceNumber < seq) {
-
-                       // Add this rejected message to the set of messages that this machine ID did not see yet
+                       // Add this rejected message to the set of messages that this
+                       // machine ID did not see yet
                        addWatchVector(lastMessageEntryMachineId, entry);
-
-                       // This client did not see this rejected message yet so add it to the watch set to monitor
+                       // This client did not see this rejected message yet so add it
+                       // to the watch set to monitor
                        deviceWatchSet->add(lastMessageEntryMachineId);
                }
        }
-
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
@@ -2346,12 +2338,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
 }
 
 /**
- * Check if this abort is live, if not then save it so we can kill it later->
- * update the last transaction number that was arbitrated on->
+ * Check if this abort is live, if not then save it so we can kill it
+ * later-> update the last transaction number that was arbitrated on->
  */
 void Table::processEntry(Abort *entry) {
-
-
        if (entry->getTransactionSequenceNumber() != -1) {
                // update the transaction status if it was sent to the server
                TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
@@ -2360,7 +2350,8 @@ void Table::processEntry(Abort *entry) {
                }
        }
 
-       // Abort has not been seen by the client it is for yet so we need to keep track of it
+       // Abort has not been seen by the client it is for yet so we need to
+       // keep track of it
        Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry);
        if (previouslySeenAbort != NULL) {
                previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
@@ -2371,7 +2362,6 @@ void Table::processEntry(Abort *entry) {
        }
 
        if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
-
                // The machine already saw this so it is dead
                entry->setDead();
                liveAbortTable->remove(entry->getAbortId());
@@ -2379,13 +2369,11 @@ void Table::processEntry(Abort *entry) {
                if (entry->getTransactionArbitrator() == localMachineId) {
                        liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
                }
-
                return;
        }
 
        // Update the last arbitration data that we have seen so far
        if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) {
-
                int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
                if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
                        // Is larger
@@ -2396,17 +2384,16 @@ void Table::processEntry(Abort *entry) {
                lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
        }
 
-
        // Set dead a transaction if we can
        Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()));
        if (transactionToSetDead != NULL) {
                liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
        }
 
-       // Update the last transaction sequence number that the arbitrator arbitrated on
+       // Update the last transaction sequence number that the arbitrator
+       // arbitrated on
        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator());
        if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
-
                // Is a valid one
                if (entry->getTransactionSequenceNumber() != -1) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
@@ -2415,10 +2402,12 @@ void Table::processEntry(Abort *entry) {
 }
 
 /**
- * Set dead the transaction part if that transaction is dead and keep track of all new parts
+ * Set dead the transaction part if that transaction is dead and keep
+ * track of all new parts
  */
 void Table::processEntry(TransactionPart *entry) {
-       // Check if we have already seen this transaction and set it dead OR if it is not alive
+       // Check if we have already seen this transaction and set it dead OR
+       // if it is not alive
        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId());
        if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) {
                // This transaction is dead, it was already committed or aborted
@@ -2435,7 +2424,8 @@ void Table::processEntry(TransactionPart *entry) {
                newTransactionParts->put(entry->getMachineId(), transactionPart);
        }
 
-       // Update the part and set dead ones we have already seen (got a rescued version)
+       // Update the part and set dead ones we have already seen (got a
+       // rescued version)
        TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
@@ -2449,25 +2439,21 @@ void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
                int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId());
-
-               // Update the last transaction sequence number that the arbitrator arbitrated on
+               // Update the last transaction sequence number that the arbitrator
+               // arbitrated on
                if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
 
-
-
-
        Hashtable<Pair<int64_t, int32_t>, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId());
-
        if (commitPart == NULL) {
                // Don't have a table for this machine Id yet so make one
                commitPart = new Hashtable<Pair<int64_t, int32_t>, CommitPart *>();
                newCommitParts->put(entry->getMachineId(), commitPart);
        }
-
-       // Update the part and set dead ones we have already seen (got a rescued version)
+       // Update the part and set dead ones we have already seen (got a
+       // rescued version)
        CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
@@ -2475,33 +2461,29 @@ void Table::processEntry(CommitPart *entry) {
 }
 
 /**
- * Update the last message seen table->  Update and set dead the appropriate RejectedMessages as clients see them->
- * Updates the live aborts, removes those that are dead and sets them dead->
- * Check that the last message seen is correct and that there is no mismatch of our own last message or that
- * other clients have not had a rollback on the last message->
+ * Update the last message seen table-> Update and set dead the
+ * appropriate RejectedMessages as clients see them-> Updates the live
+ * aborts, removes those that are dead and sets them dead-> Check that
+ * the last message seen is correct and that there is no mismatch of
+ * our own last message or that other clients have not had a rollback
+ * on the last message->
  */
 void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> *machineSet) {
-
        // We have seen this machine ID
        machineSet->remove(machineId);
 
        // Get the set of rejected messages that this machine Id is has not seen yet
        Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
-
        // If there is a rejected message that this machine Id has not seen yet
        if (watchset != NULL) {
-
-               // Go through each rejected message that this machine Id has not seen yet
+               // Go through each rejected message that this machine Id has not
+               // seen yet
                for (Iterator<RejectedMessage *> *rmit = watchset->iterator(); rmit->hasNext(); ) {
-
                        RejectedMessage *rm = rmit->next();
-
                        // If this machine Id has seen this rejected message->->->
                        if (rm->getSequenceNumber() <= seqNum) {
-
                                // Remove it from our watchlist
                                rmit->remove();
-
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
                        }
@@ -2511,19 +2493,14 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
        // Set dead the abort
        for (Iterator<Map->Entry<Pair<int64_t, int64_t>, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) {
                Abort *abort = i->next()->getValue();
-
                if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
                        abort->setDead();
                        i->remove();
-
                        if (abort->getTransactionArbitrator() == localMachineId) {
                                liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
                        }
                }
        }
-
-
-
        if (machineId == localMachineId) {
                // Our own messages are immediately dead->
                if (liveness instanceof LastMessage) {
@@ -2534,7 +2511,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                        throw new Error("Unrecognized type");
                }
        }
-
        // Get the old last message for this device
        Pair<int64_t, Liveness *> lastMessageEntry = lastMessageTable->put(machineId, Pair<int64_t, Liveness *>(seqNum, liveness));
        if (lastMessageEntry == NULL) {
@@ -2555,16 +2531,13 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                        throw new Error("Unrecognized type");
                }
        }
-
        // Make sure the server is not playing any games
        if (machineId == localMachineId) {
-
                if (hadPartialSendToServer) {
                        // We were not making any updates and we had a machine mismatch
                        if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
                                throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " +  lastMessageSeqNum  + " got: " + seqNum);
                        }
-
                } else {
                        // We were not making any updates and we had a machine mismatch
                        if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
@@ -2579,8 +2552,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
 }
 
 /**
- * Add a rejected message entry to the watch set to keep track of which clients have seen that
- * rejected message entry and which have not->
+ * Add a rejected message entry to the watch set to keep track of
+ * which clients have seen that rejected message entry and which have
+ * not.
  */
 void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
        Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
@@ -2596,12 +2570,11 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
  * Check if the HMAC chain is not violated
  */
 void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
-       for (int i = 0; i < newSlots->length; i++) {
-               Slot *currSlot = newSlots[i];
+       for (int i = 0; i < newSlots->length(); i++) {
+               Slot *currSlot = newSlots->get(i);
                Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
                if (prevSlot != NULL &&
                                !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
                        throw new Error("Server Error: Invalid HMAC Chain");
        }
 }
-