edits
[iotcloud.git] / version2 / src / C / Table.cc
index a5660091fd614018bc10f7966e0b8cee66c83308..531337cc5336f1c69dac6aed68018b2899355c98 100644 (file)
@@ -167,7 +167,16 @@ Table::~Table() {
        delete speculatedKeyValueTable;
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
-       delete lastMessageTable;
+       {
+               SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+               while (lmit->hasNext()) {
+                       Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+               }
+               delete lmit;
+               delete lastMessageTable;
+       }
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        {
                SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
                while(rmit->hasNext()) {
@@ -233,7 +242,13 @@ Table::~Table() {
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
-       delete pendingTransactionQueue;
+       {
+               uint size = pendingTransactionQueue->size();
+               for (uint iter = 0; iter < size; iter++) {
+                       delete pendingTransactionQueue->get(iter);
+               }
+               delete pendingTransactionQueue;
+       }
        delete pendingSendArbitrationEntriesToDelete;
        delete transactionPartsSent;
        delete outstandingTransactionStatus;
@@ -241,7 +256,12 @@ Table::~Table() {
        delete offlineTransactionsCommittedAndAtServer;
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
-       delete pendingSendArbitrationRounds;
+       {
+               for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
+                       delete pendingSendArbitrationRounds->get(i);
+               }
+               delete pendingSendArbitrationRounds;
+       }
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
@@ -461,6 +481,8 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
 
 void Table::startTransaction() {
        // Create a new transaction, invalidates any old pending transactions.
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
@@ -503,9 +525,12 @@ TransactionStatus *Table::commitTransaction() {
                pendingTransactionQueue->add(newTransaction);
        } else {
                arbitrateOnLocalTransaction(newTransaction);
+               delete newTransaction;
                updateLiveStateFromLocal();
        }
-
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
+       
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 
        try {
@@ -535,6 +560,7 @@ TransactionStatus *Table::commitTransaction() {
 
                                if (sendReturn.getSecond()) {
                                        // did arbitrate
+                                       delete transaction;
                                        oldindex--;
                                }
                        }
@@ -722,7 +748,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Create the slot
-                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
+                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
@@ -785,7 +811,8 @@ bool Table::sendToServer(NewKey *newKey) {
                                                //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
-                                       }
+                                       } else
+                                               delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
 
@@ -1437,10 +1464,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi
                                s->addEntry(liveentry);
                        else {
                                skipcount++;
-                               if (skipcount > Table_SKIP_THRESHOLD)
+                               if (skipcount > Table_SKIP_THRESHOLD) {
+                                       delete liveentries;
                                        goto donesearch;
+                               }
                        }
                }
+               delete liveentries;
        }
 donesearch:
        ;
@@ -1502,10 +1532,11 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
+                       delete machineSet;
                        throw new Error("Missing record for machines: ");
                }
        }
-
+       delete machineSet;
        // Update the size of our local block chain->
        commitNewMaxSize();
 
@@ -1656,14 +1687,14 @@ void Table::processNewTransactionParts() {
                        if (transaction == NULL) {
                                // This is a new transaction that we dont have so make a new one
                                transaction = new Transaction();
+                               
+                               // Add that part to the transaction
+                               transaction->addPartDecode(part);
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
-                               liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
+                               liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
                        }
-
-                       // Add that part to the transaction
-                       transaction->addPartDecode(part);
                }
                delete ptit;
        }
@@ -1724,14 +1755,12 @@ void Table::arbitrateFromServer() {
                        continue;
                }
 
-
                if (!transaction->isComplete()) {
                        // Will arbitrate in incorrect order if we continue so just break
                        // Most likely this
                        break;
                }
 
-
                // update the largest transaction seen by arbitrator from server
                if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
                        lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
@@ -1744,7 +1773,6 @@ void Table::arbitrateFromServer() {
 
                if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
                        // Guard evaluated as true
-
                        // Update the local changes so we can make the commit
                        SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                        while (kvit->hasNext()) {
@@ -1941,7 +1969,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 }
 
 /**
- * Compacts the arbitration data my merging commits and aggregating
+ * Compacts the arbitration data by merging commits and aggregating
  * aborts so that a single large push of commits can be done instead
  * of many small updates
  */
@@ -1960,6 +1988,7 @@ bool Table::compactArbitrationData() {
        bool gotNewCommit = false;
 
        uint numberToDelete = 1;
+       
        while (numberToDelete < pendingSendArbitrationRounds->size()) {
                ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
 
@@ -1991,11 +2020,23 @@ bool Table::compactArbitrationData() {
                        newSize += round->getAbortsCount();
 
                        if (newSize > ArbitrationRound_MAX_PARTS) {
-                               // Cant compact since it would be too large
+                               // Can't compact since it would be too large
+                               if (lastRound->getCommit() != newCommit &&
+                                               round->getCommit() != newCommit)
+                                       delete newCommit;
                                break;
                        }
-
                        // Set the new compacted part
+                       if (lastRound->getCommit() == newCommit)
+                               lastRound->setCommit(NULL);
+                       if (round->getCommit() == newCommit)
+                               round->setCommit(NULL);
+                       
+                       if (lastRound->getCommit() != NULL) {
+                               Commit * oldcommit = lastRound->getCommit();
+                               lastRound->setCommit(NULL);
+                               delete oldcommit;
+                       }
                        lastRound->setCommit(newCommit);
                        lastRound->addAborts(round->getAborts());
                        gotNewCommit = true;
@@ -2007,15 +2048,11 @@ bool Table::compactArbitrationData() {
        if (numberToDelete != 1) {
                // If there is a compaction
                // Delete the previous pieces that are now in the new compacted piece
-               if (numberToDelete == pendingSendArbitrationRounds->size()) {
-                       pendingSendArbitrationRounds->clear();
-               } else {
-                       for (uint i = 0; i < numberToDelete; i++) {
-                               pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
-                       }
+               for (uint i = 2; i <= numberToDelete; i++) {
+                       delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
                }
+               pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
 
-               // Add the new compacted into the pending to send list
                pendingSendArbitrationRounds->add(lastRound);
 
                // Should reinsert into the commit processor
@@ -2087,7 +2124,6 @@ bool Table::updateCommittedTable() {
        SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
        while (liveit->hasNext()) {
                int64_t arbitratorId = liveit->next();
-
                // Get all the commits for a specific arbitrator
                Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
 
@@ -2112,7 +2148,6 @@ bool Table::updateCommittedTable() {
                for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
                        int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
                        Commit *commit = commitForClientTable->get(commitSequenceNumber);
-
                        // Special processing if a commit is not complete
                        if (!commit->isComplete()) {
                                if (i == (commitSequenceNumbers->size() - 1)) {
@@ -2154,7 +2189,6 @@ bool Table::updateCommittedTable() {
                        // We have already seen this commit before so need to do the
                        // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
-
                                // Update the last transaction that was updated if we can
                                if (commit->getTransactionSequenceNumber() != -1) {
                                        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
@@ -2163,7 +2197,6 @@ bool Table::updateCommittedTable() {
                                                lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
                                        }
                                }
-
                                continue;
                        }
 
@@ -2209,6 +2242,7 @@ bool Table::updateCommittedTable() {
                                }
                        }
                        delete commitit;
+                       delete commitsToEdit;
 
                        // Update the last seen sequence number from this arbitrator
                        if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
@@ -2575,6 +2609,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
+               delete deviceWatchSet;
        } else {
                // We need to watch this rejected message
                entry->setWatchSet(deviceWatchSet);
@@ -2685,7 +2720,8 @@ void Table::processEntry(TransactionPart *entry) {
 void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
-               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
+               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
+                               lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
@@ -2698,7 +2734,7 @@ void Table::processEntry(CommitPart *entry) {
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
@@ -2732,7 +2768,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
-                               delete rm;
                        }
                }
                delete rmit;