edits
[iotcloud.git] / version2 / src / C / Table.cc
index 7c3fde05215cfeb3b547f59d8064e38039b7ef51..1a33f76f1f1bc98f7e8c267e90e4ddc515655bc0 100644 (file)
@@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -167,29 +165,115 @@ Table::~Table() {
        delete speculatedKeyValueTable;
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
-       delete lastMessageTable;
-       delete rejectedMessageWatchVectorTable;
+       {
+               SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+               while (lmit->hasNext()) {
+                       Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+                       delete pair;
+               }
+               delete lmit;
+               delete lastMessageTable;
+       }
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
+       {
+               SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
+               while(rmit->hasNext()) {
+                       int64_t machineid = rmit->next();
+                       Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
+                       SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
+                       while (mit->hasNext()) {
+                               RejectedMessage * rm = mit->next();
+                               delete rm;
+                       }
+                       delete mit;
+                       delete rmset;
+               }
+               delete rmit;
+               delete rejectedMessageWatchVectorTable;
+       }
        delete arbitratorTable;
        delete liveAbortTable;
-       delete newTransactionParts;
-       delete newCommitParts;
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+                       delete parts;
+               }
+               delete partsit;
+               delete newTransactionParts;
+       }
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+                       delete parts;
+               }
+               delete partsit;
+               delete newCommitParts;
+       }
        delete lastArbitratedTransactionNumberByArbitratorTable;
        delete liveTransactionBySequenceNumberTable;
        delete liveTransactionByTransactionIdTable;
-       delete liveCommitsTable;
+       {
+               SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
+               while (liveit->hasNext()) {
+                       int64_t arbitratorId = liveit->next();
+                       
+                       // Get all the commits for a specific arbitrator
+                       Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
+                       {
+                               SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
+                               while (clientit->hasNext()) {
+                                       int64_t id = clientit->next();
+                                       delete commitForClientTable->get(id);
+                               }
+                               delete clientit;
+                       }
+                       
+                       delete commitForClientTable;
+               }
+               delete liveit;
+               delete liveCommitsTable;
+       }
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
-       delete pendingTransactionQueue;
+       {
+               uint size = pendingTransactionQueue->size();
+               for (uint iter = 0; iter < size; iter++) {
+                       delete pendingTransactionQueue->get(iter);
+               }
+               delete pendingTransactionQueue;
+       }
        delete pendingSendArbitrationEntriesToDelete;
-       delete transactionPartsSent;
+       {
+               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+               while (trit->hasNext()) {
+                       Transaction *transaction = trit->next();
+                       delete trit->currVal();
+               }
+               delete trit;
+               delete transactionPartsSent;
+       }
        delete outstandingTransactionStatus;
        delete liveAbortsGeneratedByLocal;
        delete offlineTransactionsCommittedAndAtServer;
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
-       delete pendingSendArbitrationRounds;
+       {
+               for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
+                       delete pendingSendArbitrationRounds->get(i);
+               }
+               delete pendingSendArbitrationRounds;
+       }
+       if (lastTransactionPartsSent != NULL)
+               delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       if (lastNewKey)
+               delete lastNewKey;
 }
 
 /**
@@ -245,7 +329,7 @@ void Table::initTable() {
        Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
        localSequenceNumber++;
        TableStatus *status = new TableStatus(s, numberOfSlots);
-       s->addEntry(status);
+       s->addShallowEntry(status);
        Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
 
        if (array == NULL) {
@@ -253,10 +337,15 @@ void Table::initTable() {
                array->set(0, s);
                // update local block chain
                validateAndUpdate(array, true);
+               delete array;
        } else if (array->length() == 1) {
                // in case we did push the slot BUT we failed to init it
                validateAndUpdate(array, true);
+               delete s;
+               delete array;
        } else {
+               delete s;
+               delete array;
                throw new Error("Error on initialization");
        }
 }
@@ -269,6 +358,7 @@ void Table::rebuild() {
        // Just pull the latest slots from the server
        Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
        validateAndUpdate(newslots, true);
+       delete newslots;
        sendToServer(NULL);
        updateLiveTransactionsAndStatus();
 }
@@ -289,7 +379,7 @@ IoTString *Table::getCommitted(IoTString *key)  {
        KeyValue *kv = committedKeyValueTable->get(key);
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -307,7 +397,7 @@ IoTString *Table::getSpeculative(IoTString *key) {
        }
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -328,7 +418,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -358,7 +448,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -369,6 +459,7 @@ bool Table::update()  {
        try {
                Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
                validateAndUpdate(newSlots, false);
+               delete newSlots;
                sendToServer(NULL);
                updateLiveTransactionsAndStatus();
                return true;
@@ -401,11 +492,12 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
 
 void Table::startTransaction() {
        // Create a new transaction, invalidates any old pending transactions.
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
-void Table::addKV(IoTString *key, IoTString *value) {
-
+void Table::put(IoTString *key, IoTString *value) {
        // Make sure it is a valid key
        if (!arbitratorTable->contains(key)) {
                throw new Error("Key not Found.");
@@ -418,7 +510,7 @@ void Table::addKV(IoTString *key, IoTString *value) {
        }
 
        // Add the key value to this transaction
-       KeyValue *kv = new KeyValue(key, value);
+       KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
        pendingTransactionBuilder->addKV(kv);
 }
 
@@ -444,9 +536,12 @@ TransactionStatus *Table::commitTransaction() {
                pendingTransactionQueue->add(newTransaction);
        } else {
                arbitrateOnLocalTransaction(newTransaction);
+               delete newTransaction;
                updateLiveStateFromLocal();
        }
-
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
+       
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 
        try {
@@ -476,6 +571,7 @@ TransactionStatus *Table::commitTransaction() {
 
                                if (sendReturn.getSecond()) {
                                        // did arbitrate
+                                       delete transaction;
                                        oldindex--;
                                }
                        }
@@ -500,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() {
        return localSequenceNumber;
 }
 
+void Table::processTransactionList(bool handlePartial) {
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               transaction->resetServerFailure();
+               // Update which transactions parts still need to be sent
+               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+               // Add the transaction status to the outstanding list
+               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+               
+               // Update the transaction status
+               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+               
+               // Check if all the transaction parts were successfully
+               // sent and if so then remove it from pending
+               if (transaction->didSendAllParts()) {
+                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+                       pendingTransactionQueue->remove(transaction);
+                       delete transaction;
+               } else if (handlePartial) {
+                       transaction->resetServerFailure();
+                       // Set the transaction sequence number back to nothing
+                       if (!transaction->didSendAPartToServer()) {
+                               transaction->setSequenceNumber(-1);
+                       }
+               }
+       }
+       delete trit;
+}
+
 NewKey * Table::handlePartialSend(NewKey * newKey) {
        //Didn't receive acknowledgement for last send
        //See if the server has received a newer slot
@@ -511,97 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
                
                if (sendSlotsReturn) {
+                       lastSlotAttemptedToSend = NULL;
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully
-                               // sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               }
-                       }
-                       delete trit;
+                       processTransactionList(false);
                } else {
-                       bool isInserted = false;
-                       for (uint si = 0; si < newSlots->length(); si++) {
-                               Slot *s = newSlots->get(si);
-                               if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                                       isInserted = true;
-                                       break;
-                               }
-                       }
-                       
-                       for (uint si = 0; si < newSlots->length(); si++) {
-                               Slot *s = newSlots->get(si);
-                               if (isInserted) {
-                                       break;
-                               }
-                               
-                               // Process each entry in the slot
-                               Vector<Entry *> *ventries = s->getEntries();
-                               uint vesize = ventries->size();
-                               for (uint vei = 0; vei < vesize; vei++) {
-                                       Entry *entry = ventries->get(vei);
-                                       if (entry->getType() == TypeLastMessage) {
-                                               LastMessage *lastMessage = (LastMessage *)entry;
-                                               if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
-                                                       isInserted = true;
-                                                       break;
-                                               }
-                                       }
-                               }
-                       }
-                       
-                       if (isInserted) {
+                       if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                                if (newKey != NULL) {
                                        if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                               delete newKey;
                                                newKey = NULL;
                                        }
                                }
-                               
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-                                       
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                                       
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                                       
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                                       
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       } else {
-                                               transaction->resetServerFailure();
-                                               // Set the transaction sequence number back to nothing
-                                               if (!transaction->didSendAPartToServer()) {
-                                                       transaction->setSequenceNumber(-1);
-                                               }
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(true);
                        }
                }
                
@@ -620,73 +672,16 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                        // insert into the local block chain
                        validateAndUpdate(newSlots, true);
                }
-               // continue;
        } else {
-               bool isInserted = false;
-               for (uint si = 0; si < newSlots->length(); si++) {
-                       Slot *s = newSlots->get(si);
-                       if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                               isInserted = true;
-                               break;
-                       }
-               }
-               
-               for (uint si = 0; si < newSlots->length(); si++) {
-                       Slot *s = newSlots->get(si);
-                       if (isInserted) {
-                               break;
-                       }
-                       
-                       // Process each entry in the slot
-                       Vector<Entry *> *entries = s->getEntries();
-                       uint eSize = entries->size();
-                       for (uint ei = 0; ei < eSize; ei++) {
-                               Entry *entry = entries->get(ei);
-                               
-                               if (entry->getType() == TypeLastMessage) {
-                                       LastMessage *lastMessage = (LastMessage *)entry;
-                                       if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
-                                               isInserted = true;
-                                               break;
-                                       }
-                               }
-                       }
-               }
-               
-               if (isInserted) {
+               if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               } else {
-                                       transaction->resetServerFailure();
-                                       // Set the transaction sequence number back to nothing
-                                       if (!transaction->didSendAPartToServer()) {
-                                               transaction->setSequenceNumber(-1);
-                                       }
-                               }
-                       }
-                       delete trit;
+
+                       processTransactionList(true);
                } else {
                        SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
                        while (trit->hasNext()) {
@@ -703,9 +698,22 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                // insert into the local block chain
                validateAndUpdate(newSlots, true);
        }
+       delete newSlots;
        return newKey;
 }
 
+void Table::clearSentParts() {
+       // Clear the sent data since we are trying again
+       pendingSendArbitrationEntriesToDelete->clear();
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               delete trit->currVal();
+       }
+       delete trit;
+       transactionPartsSent->clear();
+}
+
 bool Table::sendToServer(NewKey *newKey) {
        if (hadPartialSendToServer) {
                newKey = handlePartialSend(newKey);
@@ -720,18 +728,18 @@ bool Table::sendToServer(NewKey *newKey) {
                        
                        // If there is a new key with same name then end
                        if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
+                               delete newKey;
                                return false;
                        }
 
                        // Create the slot
-                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
+                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
-                       ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
-                       bool needsResize = fillSlotsReturn.getFirst();
-                       int newSize = fillSlotsReturn.getSecond();
-                       bool insertedNewKey = fillSlotsReturn.getThird();
+                       int newSize = 0;
+                       bool insertedNewKey = false;
+                       bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
 
                        if (needsResize) {
                                // Reset which transaction to send
@@ -748,31 +756,34 @@ bool Table::sendToServer(NewKey *newKey) {
                                delete trit;
 
                                // Clear the sent data since we are trying again
-                               pendingSendArbitrationEntriesToDelete->clear();
-                               transactionPartsSent->clear();
-
+                               clearSentParts();
+                                       
                                // We needed a resize so try again
-                               fillSlot(slot, true, newKey);
+                               fillSlot(slot, true, newKey, newSize, insertedNewKey);
                        }
-
+                       if (lastSlotAttemptedToSend != NULL)
+                               delete lastSlotAttemptedToSend;
+                       
                        lastSlotAttemptedToSend = slot;
                        lastIsNewKey = (newKey != NULL);
                        lastInsertedNewKey = insertedNewKey;
                        lastNewSize = newSize;
+                       if (( newKey != lastNewKey) && (lastNewKey != NULL))
+                               delete lastNewKey;
                        lastNewKey = newKey;
+                       if (lastTransactionPartsSent != NULL)
+                               delete lastTransactionPartsSent;
                        lastTransactionPartsSent = transactionPartsSent->clone();
-                       lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
 
                        Array<Slot *> * newSlots = NULL;
                        bool wasInserted = false;
                        bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
 
                        if (sendSlotsReturn) {
+                               lastSlotAttemptedToSend = NULL;
                                // Did insert into the block chain
-
                                if (insertedNewKey) {
                                        // This slot was what was inserted not a previous slot
-
                                        // New Key was successfully inserted into the block chain so dont want to insert it again
                                        newKey = NULL;
                                }
@@ -785,34 +796,14 @@ bool Table::sendToServer(NewKey *newKey) {
                                        round->removeParts(pendingSendArbitrationEntriesToDelete);
 
                                        if (!round->isDoneSending()) {
-                                               // Sent all the parts
+                                               //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
-                                       }
+                                       } else
+                                               delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
-
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(transactionPartsSent->get(transaction));
-
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(false);
                        } else {
                                // Reset which transaction to send
                                SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
@@ -829,15 +820,14 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Clear the sent data in preparation for next send
-                       pendingSendArbitrationEntriesToDelete->clear();
-                       transactionPartsSent->clear();
+                       clearSentParts();
 
                        if (newSlots->length() != 0) {
                                // insert into the local block chain
                                validateAndUpdate(newSlots, true);
                        }
+                       delete newSlots;
                }
-
        } catch (ServerException *e) {
                if (e->getType() != ServerException_TypeInputTimeout) {
                        // Nothing was able to be sent to the server so just clear these data structures
@@ -866,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        delete trit;
                }
 
-               pendingSendArbitrationEntriesToDelete->clear();
-               transactionPartsSent->clear();
+               clearSentParts();
 
                throw e;
        }
@@ -1147,6 +1136,40 @@ Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        return returnData;
 }
 
+/** Checks whether a given slot was sent using new slots in
+               array. Returns true if sent and false otherwise.  */
+
+bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
+       uint size = array->length();
+       for (uint i = 0; i < size; i++) {
+               Slot *s = array->get(i);
+               if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+                       return true;
+               }
+       }
+       
+       //Also need to see if other machines acknowledged our message
+       for (uint i = 0; i < size; i++) {
+               Slot *s = array->get(i);
+               
+               // Process each entry in the slot
+               Vector<Entry *> *entries = s->getEntries();
+               uint eSize = entries->size();
+               for (uint ei = 0; ei < eSize; ei++) {
+                       Entry *entry = entries->get(ei);
+                       
+                       if (entry->getType() == TypeLastMessage) {
+                               LastMessage *lastMessage = (LastMessage *)entry;
+                               
+                               if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
+                                       return true;
+                               }
+                       }
+               }
+       }
+       //Not found
+       return false;
+}
 
 /** Method tries to send slot to server.  Returns status in tuple.
                isInserted returns whether last un-acked send (if any) was
@@ -1169,39 +1192,8 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn
                }
 
                if (hadPartialSendToServer) {
-                       *isInserted = false;
-                       uint size = (*array)->length();
-                       for (uint i = 0; i < size; i++) {
-                               Slot *s = (*array)->get(i);
-                               if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
-                                       *isInserted = true;
-                                       break;
-                               }
-                       }
+                       *isInserted = checkSend(*array, slot);
 
-                       //Also need to see if other machines acknowledged our message
-                       if (!(*isInserted)) {
-                               for (uint i = 0; i < size; i++) {
-                                       Slot *s = (*array)->get(i);
-                                       
-                                       // Process each entry in the slot
-                                       Vector<Entry *> *entries = s->getEntries();
-                                       uint eSize = entries->size();
-                                       for (uint ei = 0; ei < eSize; ei++) {
-                                               Entry *entry = entries->get(ei);
-                                               
-                                               if (entry->getType() == TypeLastMessage) {
-                                                       LastMessage *lastMessage = (LastMessage *)entry;
-                                                       
-                                                       if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
-                                                               *isInserted = true;
-                                                               goto done;
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               done:
                        if (!(*isInserted)) {
                                rejectedSlotVector->add(slot->getSequenceNumber());
                        }
@@ -1216,10 +1208,10 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn
 }
 
 /**
- * Returns false if a resize was needed
+ * Returns true if a resize was needed but not done.
  */
-ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
-       int newSize = 0;
+bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
+       newSize = 0;//special value to indicate no resize
        if (liveSlotCount > bufferResizeThreshold) {
                resize = true;//Resize is forced
        }
@@ -1227,14 +1219,14 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        if (resize) {
                newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
                TableStatus *status = new TableStatus(slot, newSize);
-               slot->addEntry(status);
+               slot->addShallowEntry(status);
        }
 
        // Fill with rejected slots first before doing anything else
        doRejectedMessages(slot);
 
        // Do mandatory rescue of entries
-       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
 
        // Extract working variables
        bool needsResize = mandatoryRescueReturn.getFirst();
@@ -1242,22 +1234,21 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
 
        if (needsResize && !resize) {
-               // We need to resize but we are not resizing so return false
-               return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
+               // We need to resize but we are not resizing so return true to force on retry
+               return true;
        }
 
-       bool inserted = false;
+       insertedKey = false;
        if (newKeyEntry != NULL) {
                newKeyEntry->setSlot(slot);
                if (slot->hasSpace(newKeyEntry)) {
                        slot->addEntry(newKeyEntry);
-                       inserted = true;
+                       insertedKey = true;
                }
        }
 
        // Clear the transactions, aborts and commits that were sent previously
-       transactionPartsSent->clear();
-       pendingSendArbitrationEntriesToDelete->clear();
+       clearSentParts();
        uint size = pendingSendArbitrationRounds->size();
        for (uint i = 0; i < size; i++) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
@@ -1323,7 +1314,7 @@ ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey
        // Fill the remainder of the slot with rescue data
        doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
 
-       return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
+       return false;
 }
 
 void Table::doRejectedMessages(Slot *s) {
@@ -1336,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) {
                if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
                        int64_t new_seqn = rejectedSlotVector->lastElement();
                        RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
-                       s->addEntry(rm);
+                       s->addShallowEntry(rm);
                } else {
                        int64_t prev_seqn = -1;
                        uint i = 0;
@@ -1351,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) {
                        /* Generate rejected message entry for missing messages */
                        if (prev_seqn != -1) {
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                        /* Generate rejected message entries for present messages */
                        for (; i < rejectedSlotVector->size(); i++) {
@@ -1359,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) {
                                Slot *s_msg = buffer->getSlot(curr_seqn);
                                int64_t machineid = s_msg->getMachineID();
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                }
        }
 }
 
-ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
+ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
        int64_t newestSequenceNumber = buffer->getNewestSeqNum();
        int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
        if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
@@ -1437,10 +1428,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi
                                s->addEntry(liveentry);
                        else {
                                skipcount++;
-                               if (skipcount > Table_SKIP_THRESHOLD)
+                               if (skipcount > Table_SKIP_THRESHOLD) {
+                                       delete liveentries;
                                        goto donesearch;
+                               }
                        }
                }
+               delete liveentries;
        }
 donesearch:
        ;
@@ -1488,7 +1482,8 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                        updateExpectedSize();
                }
        }
-
+       delete indexer;
+       
        // If there is a gap, check to see if the server sent us
        // everything->
        if (firstSeqNum != (sequenceNumber + 1)) {
@@ -1501,10 +1496,11 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
+                       delete machineSet;
                        throw new Error("Missing record for machines: ");
                }
        }
-
+       delete machineSet;
        // Update the size of our local block chain->
        commitNewMaxSize();
 
@@ -1655,25 +1651,33 @@ void Table::processNewTransactionParts() {
                        if (transaction == NULL) {
                                // This is a new transaction that we dont have so make a new one
                                transaction = new Transaction();
+                               
+                               // Add that part to the transaction
+                               transaction->addPartDecode(part);
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
-                               liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
+                               liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
                        }
-
-                       // Add that part to the transaction
-                       transaction->addPartDecode(part);
                }
                delete ptit;
        }
        delete tpit;
        // Clear all the new transaction parts in preparation for the next
        // time the server sends slots
-       newTransactionParts->clear();
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               newTransactionParts->clear();
+       }
 }
 
 void Table::arbitrateFromServer() {
-
        if (liveTransactionBySequenceNumberTable->size() == 0) {
                // Nothing to arbitrate on so move on
                return;
@@ -1715,14 +1719,12 @@ void Table::arbitrateFromServer() {
                        continue;
                }
 
-
                if (!transaction->isComplete()) {
                        // Will arbitrate in incorrect order if we continue so just break
                        // Most likely this
                        break;
                }
 
-
                // update the largest transaction seen by arbitrator from server
                if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
                        lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
@@ -1735,7 +1737,6 @@ void Table::arbitrateFromServer() {
 
                if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
                        // Guard evaluated as true
-
                        // Update the local changes so we can make the commit
                        SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                        while (kvit->hasNext()) {
@@ -1765,6 +1766,8 @@ void Table::arbitrateFromServer() {
                lastSeqNumArbOn = transactionSequenceNumber;
        }
 
+       delete transactionSequenceNumbers;
+
        Commit *newCommit = NULL;
 
        // If there is something to commit
@@ -1781,7 +1784,7 @@ void Table::arbitrateFromServer() {
                        newCommit->addKV(kv);
                }
                delete spit;
-
+               
                // create the commit parts
                newCommit->createCommitParts();
 
@@ -1795,6 +1798,7 @@ void Table::arbitrateFromServer() {
                        processEntry(commitPart);
                }
        }
+       delete speculativeTableTmp;
 
        if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
                ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
@@ -1811,6 +1815,8 @@ void Table::arbitrateFromServer() {
                                }
                        }
                }
+       } else {
+               delete generatedAborts;
        }
 }
 
@@ -1848,7 +1854,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                while (kvit->hasNext()) {
                        KeyValue *kv = kvit->next();
-                       newCommit->addKV(kv);
+                       newCommit->addKV(kv->getCopy());
                }
                delete kvit;
 
@@ -1931,7 +1937,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 }
 
 /**
- * Compacts the arbitration data my merging commits and aggregating
+ * Compacts the arbitration data by merging commits and aggregating
  * aborts so that a single large push of commits can be done instead
  * of many small updates
  */
@@ -1950,6 +1956,7 @@ bool Table::compactArbitrationData() {
        bool gotNewCommit = false;
 
        uint numberToDelete = 1;
+       
        while (numberToDelete < pendingSendArbitrationRounds->size()) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
@@ -1981,11 +1988,23 @@ bool Table::compactArbitrationData() {
                        newSize += round->getAbortsCount();
 
                        if (newSize > ArbitrationRound_MAX_PARTS) {
-                               // Cant compact since it would be too large
+                               // Can't compact since it would be too large
+                               if (lastRound->getCommit() != newCommit &&
+                                               round->getCommit() != newCommit)
+                                       delete newCommit;
                                break;
                        }
-
                        // Set the new compacted part
+                       if (lastRound->getCommit() == newCommit)
+                               lastRound->setCommit(NULL);
+                       if (round->getCommit() == newCommit)
+                               round->setCommit(NULL);
+                       
+                       if (lastRound->getCommit() != NULL) {
+                               Commit * oldcommit = lastRound->getCommit();
+                               lastRound->setCommit(NULL);
+                               delete oldcommit;
+                       }
                        lastRound->setCommit(newCommit);
                        lastRound->addAborts(round->getAborts());
                        gotNewCommit = true;
@@ -1997,15 +2016,11 @@ bool Table::compactArbitrationData() {
        if (numberToDelete != 1) {
                // If there is a compaction
                // Delete the previous pieces that are now in the new compacted piece
-               if (numberToDelete == pendingSendArbitrationRounds->size()) {
-                       pendingSendArbitrationRounds->clear();
-               } else {
-                       for (uint i = 0; i < numberToDelete; i++) {
-                               pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
-                       }
+               for (uint i = 2; i <= numberToDelete; i++) {
+                       delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
                }
+               pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
 
-               // Add the new compacted into the pending to send list
                pendingSendArbitrationRounds->add(lastRound);
 
                // Should reinsert into the commit processor
@@ -2022,7 +2037,6 @@ bool Table::compactArbitrationData() {
  * transactions
  */
 bool Table::updateCommittedTable() {
-
        if (newCommitParts->size() == 0) {
                // Nothing new to process
                return false;
@@ -2063,6 +2077,7 @@ bool Table::updateCommittedTable() {
                        commit->addPartDecode(part);
                }
                delete pairit;
+               delete parts;
        }
        delete partsit;
 
@@ -2077,7 +2092,6 @@ bool Table::updateCommittedTable() {
        SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
        while (liveit->hasNext()) {
                int64_t arbitratorId = liveit->next();
-
                // Get all the commits for a specific arbitrator
                Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
 
@@ -2102,7 +2116,6 @@ bool Table::updateCommittedTable() {
                for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
                        int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
                        Commit *commit = commitForClientTable->get(commitSequenceNumber);
-
                        // Special processing if a commit is not complete
                        if (!commit->isComplete()) {
                                if (i == (commitSequenceNumbers->size() - 1)) {
@@ -2116,6 +2129,7 @@ bool Table::updateCommittedTable() {
                                        // Delete it and move on
                                        commit->setDead();
                                        commitForClientTable->remove(commit->getSequenceNumber());
+                                       delete commit;
                                        continue;
                                }
                        }
@@ -2143,7 +2157,6 @@ bool Table::updateCommittedTable() {
                        // We have already seen this commit before so need to do the
                        // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
-
                                // Update the last transaction that was updated if we can
                                if (commit->getTransactionSequenceNumber() != -1) {
                                        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
@@ -2152,7 +2165,6 @@ bool Table::updateCommittedTable() {
                                                lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
                                        }
                                }
-
                                continue;
                        }
 
@@ -2193,10 +2205,12 @@ bool Table::updateCommittedTable() {
                                        // if the commit is now dead then remove it
                                        if (!previousCommit->isLive()) {
                                                commitForClientTable->remove(previousCommit->getSequenceNumber());
+                                               delete previousCommit;
                                        }
                                }
                        }
                        delete commitit;
+                       delete commitsToEdit;
 
                        // Update the last seen sequence number from this arbitrator
                        if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
@@ -2221,6 +2235,7 @@ bool Table::updateCommittedTable() {
                                delete kvit;
                        }
                }
+               delete commitSequenceNumbers;
        }
        delete liveit;
 
@@ -2277,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
 
        if (startIndex >= transactionSequenceNumbersSorted->size()) {
                // Make sure we are not out of bounds
+               delete transactionSequenceNumbersSorted;
                return false;           // did not speculate
        }
 
@@ -2315,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                }
        }
 
+       delete transactionSequenceNumbersSorted;
+       
        if (didSkip) {
                // Since there was a skip we need to redo the speculation next time around
                lastTransactionSequenceNumberSpeculatedOn = -1;
@@ -2384,13 +2402,15 @@ void Table::updateLiveTransactionsAndStatus() {
                        Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
 
                        // Check if the transaction is dead
-                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
+                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
+                                       && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
                                // Set dead the transaction
                                transaction->setDead();
 
                                // Remove the transaction from the live table
                                iter->remove();
                                liveTransactionByTransactionIdTable->remove(transaction->getId());
+                               delete transaction;
                        }
                }
                delete iter;
@@ -2404,8 +2424,8 @@ void Table::updateLiveTransactionsAndStatus() {
                        TransactionStatus *status = outstandingTransactionStatus->get(key);
 
                        // Check if the transaction is dead
-                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
-
+                       if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
+                                       && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
                                // Set committed
                                status->setStatus(TransactionStatus_StatusCommitted);
 
@@ -2561,6 +2581,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
+               delete deviceWatchSet;
        } else {
                // We need to watch this rejected message
                entry->setWatchSet(deviceWatchSet);
@@ -2659,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) {
 
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
@@ -2671,7 +2692,8 @@ void Table::processEntry(TransactionPart *entry) {
 void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
-               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
+               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
+                               lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
@@ -2684,7 +2706,7 @@ void Table::processEntry(CommitPart *entry) {
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }