edits
[iotcloud.git] / version2 / src / C / Table.cc
index a5660091fd614018bc10f7966e0b8cee66c83308..15dee7683a05545c1c9f0aea98484ba4a0d114d3 100644 (file)
@@ -167,7 +167,16 @@ Table::~Table() {
        delete speculatedKeyValueTable;
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
        delete speculatedKeyValueTable;
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
-       delete lastMessageTable;
+       {
+               SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+               while (lmit->hasNext()) {
+                       Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+               }
+               delete lmit;
+               delete lastMessageTable;
+       }
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        {
                SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
                while(rmit->hasNext()) {
        {
                SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
                while(rmit->hasNext()) {
@@ -233,7 +242,13 @@ Table::~Table() {
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
-       delete pendingTransactionQueue;
+       {
+               uint size = pendingTransactionQueue->size();
+               for (uint iter = 0; iter < size; iter++) {
+                       delete pendingTransactionQueue->get(iter);
+               }
+               delete pendingTransactionQueue;
+       }
        delete pendingSendArbitrationEntriesToDelete;
        delete transactionPartsSent;
        delete outstandingTransactionStatus;
        delete pendingSendArbitrationEntriesToDelete;
        delete transactionPartsSent;
        delete outstandingTransactionStatus;
@@ -241,7 +256,12 @@ Table::~Table() {
        delete offlineTransactionsCommittedAndAtServer;
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
        delete offlineTransactionsCommittedAndAtServer;
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
-       delete pendingSendArbitrationRounds;
+       {
+               for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
+                       delete pendingSendArbitrationRounds->get(i);
+               }
+               delete pendingSendArbitrationRounds;
+       }
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
@@ -461,6 +481,8 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
 
 void Table::startTransaction() {
        // Create a new transaction, invalidates any old pending transactions.
 
 void Table::startTransaction() {
        // Create a new transaction, invalidates any old pending transactions.
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
@@ -503,9 +525,12 @@ TransactionStatus *Table::commitTransaction() {
                pendingTransactionQueue->add(newTransaction);
        } else {
                arbitrateOnLocalTransaction(newTransaction);
                pendingTransactionQueue->add(newTransaction);
        } else {
                arbitrateOnLocalTransaction(newTransaction);
+               delete newTransaction;
                updateLiveStateFromLocal();
        }
                updateLiveStateFromLocal();
        }
-
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
+       
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 
        try {
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 
        try {
@@ -535,6 +560,7 @@ TransactionStatus *Table::commitTransaction() {
 
                                if (sendReturn.getSecond()) {
                                        // did arbitrate
 
                                if (sendReturn.getSecond()) {
                                        // did arbitrate
+                                       delete transaction;
                                        oldindex--;
                                }
                        }
                                        oldindex--;
                                }
                        }
@@ -722,7 +748,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Create the slot
                        }
 
                        // Create the slot
-                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
+                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
@@ -785,7 +811,8 @@ bool Table::sendToServer(NewKey *newKey) {
                                                //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
                                                //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
-                                       }
+                                       } else
+                                               delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
 
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
 
@@ -1437,10 +1464,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi
                                s->addEntry(liveentry);
                        else {
                                skipcount++;
                                s->addEntry(liveentry);
                        else {
                                skipcount++;
-                               if (skipcount > Table_SKIP_THRESHOLD)
+                               if (skipcount > Table_SKIP_THRESHOLD) {
+                                       delete liveentries;
                                        goto donesearch;
                                        goto donesearch;
+                               }
                        }
                }
                        }
                }
+               delete liveentries;
        }
 donesearch:
        ;
        }
 donesearch:
        ;
@@ -1502,10 +1532,11 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
+                       delete machineSet;
                        throw new Error("Missing record for machines: ");
                }
        }
                        throw new Error("Missing record for machines: ");
                }
        }
-
+       delete machineSet;
        // Update the size of our local block chain->
        commitNewMaxSize();
 
        // Update the size of our local block chain->
        commitNewMaxSize();
 
@@ -1656,14 +1687,14 @@ void Table::processNewTransactionParts() {
                        if (transaction == NULL) {
                                // This is a new transaction that we dont have so make a new one
                                transaction = new Transaction();
                        if (transaction == NULL) {
                                // This is a new transaction that we dont have so make a new one
                                transaction = new Transaction();
+                               
+                               // Add that part to the transaction
+                               transaction->addPartDecode(part);
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
-                               liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
+                               liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
                        }
                        }
-
-                       // Add that part to the transaction
-                       transaction->addPartDecode(part);
                }
                delete ptit;
        }
                }
                delete ptit;
        }
@@ -1724,14 +1755,12 @@ void Table::arbitrateFromServer() {
                        continue;
                }
 
                        continue;
                }
 
-
                if (!transaction->isComplete()) {
                        // Will arbitrate in incorrect order if we continue so just break
                        // Most likely this
                        break;
                }
 
                if (!transaction->isComplete()) {
                        // Will arbitrate in incorrect order if we continue so just break
                        // Most likely this
                        break;
                }
 
-
                // update the largest transaction seen by arbitrator from server
                if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
                        lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
                // update the largest transaction seen by arbitrator from server
                if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
                        lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
@@ -1744,7 +1773,6 @@ void Table::arbitrateFromServer() {
 
                if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
                        // Guard evaluated as true
 
                if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
                        // Guard evaluated as true
-
                        // Update the local changes so we can make the commit
                        SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                        while (kvit->hasNext()) {
                        // Update the local changes so we can make the commit
                        SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                        while (kvit->hasNext()) {
@@ -1941,7 +1969,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 }
 
 /**
 }
 
 /**
- * Compacts the arbitration data my merging commits and aggregating
+ * Compacts the arbitration data by merging commits and aggregating
  * aborts so that a single large push of commits can be done instead
  * of many small updates
  */
  * aborts so that a single large push of commits can be done instead
  * of many small updates
  */
@@ -1996,6 +2024,16 @@ bool Table::compactArbitrationData() {
                        }
 
                        // Set the new compacted part
                        }
 
                        // Set the new compacted part
+                       if (lastRound->getCommit() == newCommit)
+                               lastRound->setCommit(NULL);
+                       if (round->getCommit() == newCommit)
+                               round->setCommit(NULL);
+                       
+                       if (lastRound->getCommit() != NULL) {
+                               Commit * oldcommit = lastRound->getCommit();
+                               lastRound->setCommit(NULL);
+                               delete oldcommit;
+                       }
                        lastRound->setCommit(newCommit);
                        lastRound->addAborts(round->getAborts());
                        gotNewCommit = true;
                        lastRound->setCommit(newCommit);
                        lastRound->addAborts(round->getAborts());
                        gotNewCommit = true;
@@ -2007,15 +2045,11 @@ bool Table::compactArbitrationData() {
        if (numberToDelete != 1) {
                // If there is a compaction
                // Delete the previous pieces that are now in the new compacted piece
        if (numberToDelete != 1) {
                // If there is a compaction
                // Delete the previous pieces that are now in the new compacted piece
-               if (numberToDelete == pendingSendArbitrationRounds->size()) {
-                       pendingSendArbitrationRounds->clear();
-               } else {
-                       for (uint i = 0; i < numberToDelete; i++) {
-                               pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
-                       }
+               for (uint i = 2; i <= numberToDelete; i++) {
+                       delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
                }
                }
+               pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
 
 
-               // Add the new compacted into the pending to send list
                pendingSendArbitrationRounds->add(lastRound);
 
                // Should reinsert into the commit processor
                pendingSendArbitrationRounds->add(lastRound);
 
                // Should reinsert into the commit processor
@@ -2154,7 +2188,6 @@ bool Table::updateCommittedTable() {
                        // We have already seen this commit before so need to do the
                        // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
                        // We have already seen this commit before so need to do the
                        // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
-
                                // Update the last transaction that was updated if we can
                                if (commit->getTransactionSequenceNumber() != -1) {
                                        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
                                // Update the last transaction that was updated if we can
                                if (commit->getTransactionSequenceNumber() != -1) {
                                        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
@@ -2163,7 +2196,6 @@ bool Table::updateCommittedTable() {
                                                lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
                                        }
                                }
                                                lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
                                        }
                                }
-
                                continue;
                        }
 
                                continue;
                        }
 
@@ -2209,6 +2241,7 @@ bool Table::updateCommittedTable() {
                                }
                        }
                        delete commitit;
                                }
                        }
                        delete commitit;
+                       delete commitsToEdit;
 
                        // Update the last seen sequence number from this arbitrator
                        if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
 
                        // Update the last seen sequence number from this arbitrator
                        if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
@@ -2227,6 +2260,11 @@ bool Table::updateCommittedTable() {
                                SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
                                SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
+                                       printf("Commited KeyValue Table update for %p\n", this);
+                                       kv->getKey()->print();
+                                       printf("\n");
+                                       kv->getValue()->print();
+                                       printf("\n");
                                        committedKeyValueTable->put(kv->getKey(), kv);
                                        liveCommitsByKeyTable->put(kv->getKey(), commit);
                                }
                                        committedKeyValueTable->put(kv->getKey(), kv);
                                        liveCommitsByKeyTable->put(kv->getKey(), commit);
                                }
@@ -2575,6 +2613,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
+               delete deviceWatchSet;
        } else {
                // We need to watch this rejected message
                entry->setWatchSet(deviceWatchSet);
        } else {
                // We need to watch this rejected message
                entry->setWatchSet(deviceWatchSet);
@@ -2685,7 +2724,8 @@ void Table::processEntry(TransactionPart *entry) {
 void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
 void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
-               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
+               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
+                               lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
@@ -2698,7 +2738,7 @@ void Table::processEntry(CommitPart *entry) {
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
@@ -2732,7 +2772,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
-                               delete rm;
                        }
                }
                delete rmit;
                        }
                }
                delete rmit;