edits
[iotcloud.git] / version2 / src / C / Table.cc
index 15dee7683a05545c1c9f0aea98484ba4a0d114d3..1a33f76f1f1bc98f7e8c267e90e4ddc515655bc0 100644 (file)
@@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
        lastIsNewKey(false),
        lastNewSize(0),
        lastTransactionPartsSent(NULL),
-       lastPendingSendArbitrationEntriesToDelete(NULL),
        lastNewKey(NULL),
        committedKeyValueTable(NULL),
        speculatedKeyValueTable(NULL),
@@ -171,6 +169,7 @@ Table::~Table() {
                SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
                while (lmit->hasNext()) {
                        Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+                       delete pair;
                }
                delete lmit;
                delete lastMessageTable;
@@ -199,7 +198,7 @@ Table::~Table() {
                SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
-                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
                        delete parts;
                }
                delete partsit;
@@ -209,7 +208,7 @@ Table::~Table() {
                SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
                while (partsit->hasNext()) {
                        int64_t machineId = partsit->next();
-                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
                        delete parts;
                }
                delete partsit;
@@ -224,7 +223,7 @@ Table::~Table() {
                        int64_t arbitratorId = liveit->next();
                        
                        // Get all the commits for a specific arbitrator
-                       Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+                       Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
                        {
                                SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
                                while (clientit->hasNext()) {
@@ -250,7 +249,15 @@ Table::~Table() {
                delete pendingTransactionQueue;
        }
        delete pendingSendArbitrationEntriesToDelete;
-       delete transactionPartsSent;
+       {
+               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+               while (trit->hasNext()) {
+                       Transaction *transaction = trit->next();
+                       delete trit->currVal();
+               }
+               delete trit;
+               delete transactionPartsSent;
+       }
        delete outstandingTransactionStatus;
        delete liveAbortsGeneratedByLocal;
        delete offlineTransactionsCommittedAndAtServer;
@@ -265,6 +272,8 @@ Table::~Table() {
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+       if (lastNewKey)
+               delete lastNewKey;
 }
 
 /**
@@ -320,7 +329,7 @@ void Table::initTable() {
        Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
        localSequenceNumber++;
        TableStatus *status = new TableStatus(s, numberOfSlots);
-       s->addEntry(status);
+       s->addShallowEntry(status);
        Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
 
        if (array == NULL) {
@@ -332,8 +341,10 @@ void Table::initTable() {
        } else if (array->length() == 1) {
                // in case we did push the slot BUT we failed to init it
                validateAndUpdate(array, true);
+               delete s;
                delete array;
        } else {
+               delete s;
                delete array;
                throw new Error("Error on initialization");
        }
@@ -585,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() {
        return localSequenceNumber;
 }
 
+void Table::processTransactionList(bool handlePartial) {
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               transaction->resetServerFailure();
+               // Update which transactions parts still need to be sent
+               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+               // Add the transaction status to the outstanding list
+               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+               
+               // Update the transaction status
+               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+               
+               // Check if all the transaction parts were successfully
+               // sent and if so then remove it from pending
+               if (transaction->didSendAllParts()) {
+                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+                       pendingTransactionQueue->remove(transaction);
+                       delete transaction;
+               } else if (handlePartial) {
+                       transaction->resetServerFailure();
+                       // Set the transaction sequence number back to nothing
+                       if (!transaction->didSendAPartToServer()) {
+                               transaction->setSequenceNumber(-1);
+                       }
+               }
+       }
+       delete trit;
+}
+
 NewKey * Table::handlePartialSend(NewKey * newKey) {
        //Didn't receive acknowledgement for last send
        //See if the server has received a newer slot
@@ -596,67 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
                
                if (sendSlotsReturn) {
+                       lastSlotAttemptedToSend = NULL;
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully
-                               // sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               }
-                       }
-                       delete trit;
+                       processTransactionList(false);
                } else {
                        if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                                if (newKey != NULL) {
                                        if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                               delete newKey;
                                                newKey = NULL;
                                        }
                                }
-                               
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-                                       
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                                       
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                                       
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                                       
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       } else {
-                                               transaction->resetServerFailure();
-                                               // Set the transaction sequence number back to nothing
-                                               if (!transaction->didSendAPartToServer()) {
-                                                       transaction->setSequenceNumber(-1);
-                                               }
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(true);
                        }
                }
                
@@ -679,37 +676,12 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
                                if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+                                       delete newKey;
                                        newKey = NULL;
                                }
                        }
-                       
-                       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
-                       while (trit->hasNext()) {
-                               Transaction *transaction = trit->next();
-                               transaction->resetServerFailure();
-                               
-                               // Update which transactions parts still need to be sent
-                               transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
-                               
-                               // Add the transaction status to the outstanding list
-                               outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-                               
-                               // Update the transaction status
-                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-                               
-                               // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                               if (transaction->didSendAllParts()) {
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                       pendingTransactionQueue->remove(transaction);
-                               } else {
-                                       transaction->resetServerFailure();
-                                       // Set the transaction sequence number back to nothing
-                                       if (!transaction->didSendAPartToServer()) {
-                                               transaction->setSequenceNumber(-1);
-                                       }
-                               }
-                       }
-                       delete trit;
+
+                       processTransactionList(true);
                } else {
                        SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
                        while (trit->hasNext()) {
@@ -730,6 +702,18 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
        return newKey;
 }
 
+void Table::clearSentParts() {
+       // Clear the sent data since we are trying again
+       pendingSendArbitrationEntriesToDelete->clear();
+       SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+       while (trit->hasNext()) {
+               Transaction *transaction = trit->next();
+               delete trit->currVal();
+       }
+       delete trit;
+       transactionPartsSent->clear();
+}
+
 bool Table::sendToServer(NewKey *newKey) {
        if (hadPartialSendToServer) {
                newKey = handlePartialSend(newKey);
@@ -744,6 +728,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        
                        // If there is a new key with same name then end
                        if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
+                               delete newKey;
                                return false;
                        }
 
@@ -771,28 +756,31 @@ bool Table::sendToServer(NewKey *newKey) {
                                delete trit;
 
                                // Clear the sent data since we are trying again
-                               pendingSendArbitrationEntriesToDelete->clear();
-                               transactionPartsSent->clear();
-
+                               clearSentParts();
+                                       
                                // We needed a resize so try again
                                fillSlot(slot, true, newKey, newSize, insertedNewKey);
                        }
-
+                       if (lastSlotAttemptedToSend != NULL)
+                               delete lastSlotAttemptedToSend;
+                       
                        lastSlotAttemptedToSend = slot;
                        lastIsNewKey = (newKey != NULL);
                        lastInsertedNewKey = insertedNewKey;
                        lastNewSize = newSize;
+                       if (( newKey != lastNewKey) && (lastNewKey != NULL))
+                               delete lastNewKey;
                        lastNewKey = newKey;
                        if (lastTransactionPartsSent != NULL)
                                delete lastTransactionPartsSent;
                        lastTransactionPartsSent = transactionPartsSent->clone();
-                       lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
 
                        Array<Slot *> * newSlots = NULL;
                        bool wasInserted = false;
                        bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
 
                        if (sendSlotsReturn) {
+                               lastSlotAttemptedToSend = NULL;
                                // Did insert into the block chain
                                if (insertedNewKey) {
                                        // This slot was what was inserted not a previous slot
@@ -815,28 +803,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                                delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
-
-                               SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
-                               while (trit->hasNext()) {
-                                       Transaction *transaction = trit->next();
-                                       transaction->resetServerFailure();
-
-                                       // Update which transactions parts still need to be sent
-                                       transaction->removeSentParts(transactionPartsSent->get(transaction));
-
-                                       // Add the transaction status to the outstanding list
-                                       outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
-
-                                       // Update the transaction status
-                                       transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
-
-                                       // Check if all the transaction parts were successfully sent and if so then remove it from pending
-                                       if (transaction->didSendAllParts()) {
-                                               transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
-                                               pendingTransactionQueue->remove(transaction);
-                                       }
-                               }
-                               delete trit;
+                               processTransactionList(false);
                        } else {
                                // Reset which transaction to send
                                SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
@@ -853,8 +820,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Clear the sent data in preparation for next send
-                       pendingSendArbitrationEntriesToDelete->clear();
-                       transactionPartsSent->clear();
+                       clearSentParts();
 
                        if (newSlots->length() != 0) {
                                // insert into the local block chain
@@ -890,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        delete trit;
                }
 
-               pendingSendArbitrationEntriesToDelete->clear();
-               transactionPartsSent->clear();
+               clearSentParts();
 
                throw e;
        }
@@ -1254,14 +1219,14 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize
        if (resize) {
                newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
                TableStatus *status = new TableStatus(slot, newSize);
-               slot->addEntry(status);
+               slot->addShallowEntry(status);
        }
 
        // Fill with rejected slots first before doing anything else
        doRejectedMessages(slot);
 
        // Do mandatory rescue of entries
-       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
+       ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
 
        // Extract working variables
        bool needsResize = mandatoryRescueReturn.getFirst();
@@ -1283,8 +1248,7 @@ bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize
        }
 
        // Clear the transactions, aborts and commits that were sent previously
-       transactionPartsSent->clear();
-       pendingSendArbitrationEntriesToDelete->clear();
+       clearSentParts();
        uint size = pendingSendArbitrationRounds->size();
        for (uint i = 0; i < size; i++) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
@@ -1363,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) {
                if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
                        int64_t new_seqn = rejectedSlotVector->lastElement();
                        RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
-                       s->addEntry(rm);
+                       s->addShallowEntry(rm);
                } else {
                        int64_t prev_seqn = -1;
                        uint i = 0;
@@ -1378,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) {
                        /* Generate rejected message entry for missing messages */
                        if (prev_seqn != -1) {
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                        /* Generate rejected message entries for present messages */
                        for (; i < rejectedSlotVector->size(); i++) {
@@ -1386,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) {
                                Slot *s_msg = buffer->getSlot(curr_seqn);
                                int64_t machineid = s_msg->getMachineID();
                                RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
-                               s->addEntry(rm);
+                               s->addShallowEntry(rm);
                        }
                }
        }
 }
 
-ThreeTuple<bool, bool, int64_t> Table::doMandatoryResuce(Slot *slot, bool resize) {
+ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
        int64_t newestSequenceNumber = buffer->getNewestSeqNum();
        int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
        if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
@@ -1802,6 +1766,8 @@ void Table::arbitrateFromServer() {
                lastSeqNumArbOn = transactionSequenceNumber;
        }
 
+       delete transactionSequenceNumbers;
+
        Commit *newCommit = NULL;
 
        // If there is something to commit
@@ -1849,6 +1815,8 @@ void Table::arbitrateFromServer() {
                                }
                        }
                }
+       } else {
+               delete generatedAborts;
        }
 }
 
@@ -1886,7 +1854,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
                SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                while (kvit->hasNext()) {
                        KeyValue *kv = kvit->next();
-                       newCommit->addKV(kv);
+                       newCommit->addKV(kv->getCopy());
                }
                delete kvit;
 
@@ -1988,6 +1956,7 @@ bool Table::compactArbitrationData() {
        bool gotNewCommit = false;
 
        uint numberToDelete = 1;
+       
        while (numberToDelete < pendingSendArbitrationRounds->size()) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
@@ -2019,10 +1988,12 @@ bool Table::compactArbitrationData() {
                        newSize += round->getAbortsCount();
 
                        if (newSize > ArbitrationRound_MAX_PARTS) {
-                               // Cant compact since it would be too large
+                               // Can't compact since it would be too large
+                               if (lastRound->getCommit() != newCommit &&
+                                               round->getCommit() != newCommit)
+                                       delete newCommit;
                                break;
                        }
-
                        // Set the new compacted part
                        if (lastRound->getCommit() == newCommit)
                                lastRound->setCommit(NULL);
@@ -2121,7 +2092,6 @@ bool Table::updateCommittedTable() {
        SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
        while (liveit->hasNext()) {
                int64_t arbitratorId = liveit->next();
-
                // Get all the commits for a specific arbitrator
                Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
 
@@ -2146,7 +2116,6 @@ bool Table::updateCommittedTable() {
                for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
                        int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
                        Commit *commit = commitForClientTable->get(commitSequenceNumber);
-
                        // Special processing if a commit is not complete
                        if (!commit->isComplete()) {
                                if (i == (commitSequenceNumbers->size() - 1)) {
@@ -2260,17 +2229,13 @@ bool Table::updateCommittedTable() {
                                SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
-                                       printf("Commited KeyValue Table update for %p\n", this);
-                                       kv->getKey()->print();
-                                       printf("\n");
-                                       kv->getValue()->print();
-                                       printf("\n");
                                        committedKeyValueTable->put(kv->getKey(), kv);
                                        liveCommitsByKeyTable->put(kv->getKey(), commit);
                                }
                                delete kvit;
                        }
                }
+               delete commitSequenceNumbers;
        }
        delete liveit;
 
@@ -2327,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
 
        if (startIndex >= transactionSequenceNumbersSorted->size()) {
                // Make sure we are not out of bounds
+               delete transactionSequenceNumbersSorted;
                return false;           // did not speculate
        }
 
@@ -2365,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                }
        }
 
+       delete transactionSequenceNumbersSorted;
+       
        if (didSkip) {
                // Since there was a skip we need to redo the speculation next time around
                lastTransactionSequenceNumberSpeculatedOn = -1;
@@ -2712,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) {
 
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       TransactionPart *previouslySeenPart = transactionPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }