X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;fp=version2%2Fsrc%2FC%2FTable.cc;h=15dee7683a05545c1c9f0aea98484ba4a0d114d3;hp=a5660091fd614018bc10f7966e0b8cee66c83308;hb=d0f7ade2b1b60015ff3cb6ada2071dfafaa656f5;hpb=793c445e17d24c146e7e820a5ecb5f02a5e00ea3 diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index a566009..15dee76 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -167,7 +167,16 @@ Table::~Table() { delete speculatedKeyValueTable; delete pendingTransactionSpeculatedKeyValueTable; delete liveNewKeyTable; - delete lastMessageTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; { SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); while(rmit->hasNext()) { @@ -233,7 +242,13 @@ Table::~Table() { delete liveCommitsByKeyTable; delete lastCommitSeenSequenceNumberByArbitratorTable; delete rejectedSlotVector; - delete pendingTransactionQueue; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } delete pendingSendArbitrationEntriesToDelete; delete transactionPartsSent; delete outstandingTransactionStatus; @@ -241,7 +256,12 @@ Table::~Table() { delete offlineTransactionsCommittedAndAtServer; delete localCommunicationTable; delete lastTransactionSeenFromMachineFromServer; - delete pendingSendArbitrationRounds; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; @@ -461,6 +481,8 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; pendingTransactionBuilder = new PendingTransaction(localMachineId); } @@ -503,9 +525,12 @@ TransactionStatus *Table::commitTransaction() { pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; updateLiveStateFromLocal(); } - + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); try { @@ -535,6 +560,7 @@ TransactionStatus *Table::commitTransaction() { if (sendReturn.getSecond()) { // did arbitrate + delete transaction; oldindex--; } } @@ -722,7 +748,7 @@ bool Table::sendToServer(NewKey *newKey) { } // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data @@ -785,7 +811,8 @@ bool Table::sendToServer(NewKey *newKey) { //Add part back in pendingSendArbitrationRounds->set(oldcount++, pendingSendArbitrationRounds->get(i)); - } + } else + delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); @@ -1437,10 +1464,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi s->addEntry(liveentry); else { skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; goto donesearch; + } } } + delete liveentries; } donesearch: ; @@ -1502,10 +1532,11 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // must have a last message message-> If not then the server is // hiding slots if (!machineSet->isEmpty()) { + delete machineSet; throw new Error("Missing record for machines: "); } } - + delete machineSet; // Update the size of our local block chain-> commitNewMaxSize(); @@ -1656,14 +1687,14 @@ void Table::processNewTransactionParts() { if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } - - // Add that part to the transaction - transaction->addPartDecode(part); } delete ptit; } @@ -1724,14 +1755,12 @@ void Table::arbitrateFromServer() { continue; } - if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } - // update the largest transaction seen by arbitrator from server if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); @@ -1744,7 +1773,6 @@ void Table::arbitrateFromServer() { if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true - // Update the local changes so we can make the commit SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { @@ -1941,7 +1969,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating + * Compacts the arbitration data by merging commits and aggregating * aborts so that a single large push of commits can be done instead * of many small updates */ @@ -1996,6 +2024,16 @@ bool Table::compactArbitrationData() { } // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } lastRound->setCommit(newCommit); lastRound->addAborts(round->getAborts()); gotNewCommit = true; @@ -2007,15 +2045,11 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds->size()) { - pendingSendArbitrationRounds->clear(); - } else { - for (uint i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); - } + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - // Add the new compacted into the pending to send list pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor @@ -2154,7 +2188,6 @@ bool Table::updateCommittedTable() { // We have already seen this commit before so need to do the // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); @@ -2163,7 +2196,6 @@ bool Table::updateCommittedTable() { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } - continue; } @@ -2209,6 +2241,7 @@ bool Table::updateCommittedTable() { } } delete commitit; + delete commitsToEdit; // Update the last seen sequence number from this arbitrator if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { @@ -2227,6 +2260,11 @@ bool Table::updateCommittedTable() { SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); + printf("Commited KeyValue Table update for %p\n", this); + kv->getKey()->print(); + printf("\n"); + kv->getValue()->print(); + printf("\n"); committedKeyValueTable->put(kv->getKey(), kv); liveCommitsByKeyTable->put(kv->getKey(), commit); } @@ -2575,6 +2613,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); + delete deviceWatchSet; } else { // We need to watch this rejected message entry->setWatchSet(deviceWatchSet); @@ -2685,7 +2724,8 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } @@ -2698,7 +2738,7 @@ void Table::processEntry(CommitPart *entry) { } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); + CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2732,7 +2772,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven rmit->remove(); // Decrement machines that need to see this notification rm->removeWatcher(machineId); - delete rm; } } delete rmit;