From d0f7ade2b1b60015ff3cb6ada2071dfafaa656f5 Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 15 Mar 2018 04:25:20 -0700 Subject: [PATCH] edits --- version2/src/C/ArbitrationRound.cc | 2 + version2/src/C/CloudComm.cc | 27 +++++++- version2/src/C/Commit.cc | 8 +++ version2/src/C/Commit.h | 1 + version2/src/C/CommitPart.cc | 4 +- version2/src/C/CommitPart.h | 2 +- version2/src/C/RejectedMessage.cc | 5 ++ version2/src/C/RejectedMessage.h | 6 +- version2/src/C/Slot.cc | 12 +++- version2/src/C/Slot.h | 3 +- version2/src/C/SlotBuffer.cc | 15 +++++ version2/src/C/Table.cc | 95 ++++++++++++++++++++-------- version2/src/C/Transaction.cc | 6 ++ version2/src/C/Transaction.h | 1 + version2/src/java/iotcloud/Test.java | 2 +- 15 files changed, 151 insertions(+), 38 deletions(-) diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index f5a1521..543c2d0 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -20,6 +20,8 @@ ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset *_abortsBef ArbitrationRound::~ArbitrationRound() { delete abortsBefore; delete parts; + if (commit != NULL) + delete commit; } void ArbitrationRound::generateParts() { diff --git a/version2/src/C/CloudComm.cc b/version2/src/C/CloudComm.cc index 2a7d233..c229259 100644 --- a/version2/src/C/CloudComm.cc +++ b/version2/src/C/CloudComm.cc @@ -64,9 +64,16 @@ CloudComm::CloudComm(Table *_table, IoTString *_baseurl, IoTString *_password, } CloudComm::~CloudComm() { - delete random; delete getslot; delete putslot; + if (salt) + delete salt; + if (password) + delete password; + if (random) + delete random; + if (baseurl) + delete baseurl; } /** @@ -437,12 +444,18 @@ bool CloudComm::getSalt() { try { timer->startTime(); wc = openURL(urlstr); + delete urlstr; + urlstr = NULL; closeURLReq(&wc); timer->endTime(); } catch (SocketTimeoutException *e) { + if (urlstr) + delete urlstr; timer->endTime(); throw new ServerException("getSalt failed", ServerException_TypeConnectTimeout); } catch (Exception *e) { + if (urlstr) + delete urlstr; throw new Error("getSlot failed"); } @@ -559,30 +572,38 @@ Array *CloudComm::putSlot(Slot *slot, int max) { throw new Error("putSlot failed"); } + Array *resptype = NULL; try { int respcode = getResponseCode(&wc); readHeaders(&wc); timer->startTime(); - Array *resptype = new Array(7); + resptype = new Array(7); readURLData(&wc, resptype); timer->endTime(); if (resptype->equals(getslot)) { + delete resptype; Array *tmp = processSlots(&wc); close(wc.fd); return tmp; } else if (resptype->equals(putslot)) { + delete resptype; close(wc.fd); return NULL; } else { + delete resptype; close(wc.fd); throw new Error("Bad response to putslot"); } } catch (SocketTimeoutException *e) { + if (resptype != NULL) + delete resptype; timer->endTime(); close(wc.fd); throw new ServerException("putSlot failed", ServerException_TypeInputTimeout); } catch (Exception *e) { + if (resptype != NULL) + delete resptype; throw new Error("putSlot failed"); } } @@ -627,6 +648,7 @@ Array *CloudComm::getSlots(int64_t sequencenumber) { if (!resptype->equals(getslot)) throw new Error("Bad Response: "); + delete resptype; Array *tmp = processSlots(&wc); close(wc.fd); return tmp; @@ -658,6 +680,7 @@ Array *CloudComm::processSlots(WebConnection *wc) { slots->set(i, Slot_decode(table, data, mac)); delete data; } + delete sizesofslots; return slots; } diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index 5ed8c89..6c46fed 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -31,6 +31,14 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction liveKeys(new Hashset) { } +Commit::~Commit() { + delete parts; + delete keyValueUpdateSet; + delete liveKeys; + if (missingParts != NULL) + delete missingParts; +} + void Commit::addPartDecode(CommitPart *newPart) { if (isDead) { // If dead then just kill this part and move on diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index 1c48250..df263ae 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -22,6 +22,7 @@ private: public: Commit(); Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber); + ~Commit(); void addPartDecode(CommitPart *newPart); int64_t getSequenceNumber(); int64_t getTransactionSequenceNumber(); diff --git a/version2/src/C/CommitPart.cc b/version2/src/C/CommitPart.cc index 2e08c4b..a23f5de 100644 --- a/version2/src/C/CommitPart.cc +++ b/version2/src/C/CommitPart.cc @@ -36,8 +36,8 @@ Array *CommitPart::getData() { return data; } -Pair CommitPart::getPartId() { - return partId; +Pair * CommitPart::getPartId() { + return & partId; } Pair CommitPart::getCommitId() { diff --git a/version2/src/C/CommitPart.h b/version2/src/C/CommitPart.h index a569f1c..606f889 100644 --- a/version2/src/C/CommitPart.h +++ b/version2/src/C/CommitPart.h @@ -27,7 +27,7 @@ public: int getPartNumber(); int getDataSize(); Array *getData(); - Pair getPartId(); + Pair * getPartId(); Pair getCommitId(); bool isLastPart(); int64_t getMachineId(); diff --git a/version2/src/C/RejectedMessage.cc b/version2/src/C/RejectedMessage.cc index da5d574..261f30c 100644 --- a/version2/src/C/RejectedMessage.cc +++ b/version2/src/C/RejectedMessage.cc @@ -18,6 +18,11 @@ Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb) { return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto == 1); } +RejectedMessage::~RejectedMessage() { + if (watchset != NULL) + delete watchset; +} + void RejectedMessage::removeWatcher(int64_t machineid) { if (watchset->remove(machineid)) if (watchset->isEmpty()) diff --git a/version2/src/C/RejectedMessage.h b/version2/src/C/RejectedMessage.h index c9d4d79..95b0dc4 100644 --- a/version2/src/C/RejectedMessage.h +++ b/version2/src/C/RejectedMessage.h @@ -32,9 +32,11 @@ public: machineid(_machineid), oldseqnum(_oldseqnum), newseqnum(_newseqnum), - equalto(_equalto) { + equalto(_equalto), + watchset(NULL) { } - + ~RejectedMessage(); + int64_t getOldSeqNum() { return oldseqnum; } int64_t getNewSeqNum() { return newseqnum; } bool getEqual() { return equalto; } diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 03aa3cc..4a6f268 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -46,6 +46,15 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSeq localSequenceNumber(_localSequenceNumber) { } +Slot::~Slot() { + if (hmac != NULL) + delete hmac; + delete prevhmac; + for(uint i=0; i< entries->size(); i++) + delete entries->get(i); + delete entries; +} + Entry *Slot::addEntry(Entry *e) { e = e->getCopy(this); entries->add(e); @@ -84,7 +93,8 @@ Slot *Slot_decode(Table *table, Array *array, Mac *mac) { bb->get(prevhmac); if (!realmac->equals(hmac)) throw new Error("Server Error: Invalid HMAC! Potential Attack!"); - + delete realmac; + int64_t seqnum = bb->getLong(); int64_t machineid = bb->getLong(); int numentries = bb->getInt(); diff --git a/version2/src/C/Slot.h b/version2/src/C/Slot.h index d6e7575..56732a4 100644 --- a/version2/src/C/Slot.h +++ b/version2/src/C/Slot.h @@ -36,7 +36,8 @@ public: Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, Array *_hmac, int64_t _localSequenceNumber); Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, int64_t _localSequenceNumber); Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber); - + ~Slot(); + Array *getHMAC() { return hmac; } Array *getPrevHMAC() { return prevhmac; } Entry *addEntry(Entry *e); diff --git a/version2/src/C/SlotBuffer.cc b/version2/src/C/SlotBuffer.cc index aa217ac..af6b031 100644 --- a/version2/src/C/SlotBuffer.cc +++ b/version2/src/C/SlotBuffer.cc @@ -14,6 +14,13 @@ SlotBuffer::SlotBuffer() : } SlotBuffer::~SlotBuffer() { + uint index = tail; + while (index != head) { + delete array->get(index); + index++; + if (index == array->length()) + index = 0; + } delete array; } @@ -51,6 +58,7 @@ void SlotBuffer::incrementHead() { } void SlotBuffer::incrementTail() { + delete array->get(tail); tail++; if (((uint32_t)tail) >= array->length()) tail = 0; @@ -60,6 +68,13 @@ void SlotBuffer::putSlot(Slot *s) { int64_t checkNum = (getNewestSeqNum() + 1); if (checkNum != s->getSequenceNumber()) { + uint index = tail; + while (index != head) { + delete array->get(index); + index++; + if (index == array->length()) + index = 0; + } oldestseqn = s->getSequenceNumber(); tail = 0; head = 1; diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index a566009..15dee76 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -167,7 +167,16 @@ Table::~Table() { delete speculatedKeyValueTable; delete pendingTransactionSpeculatedKeyValueTable; delete liveNewKeyTable; - delete lastMessageTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; { SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); while(rmit->hasNext()) { @@ -233,7 +242,13 @@ Table::~Table() { delete liveCommitsByKeyTable; delete lastCommitSeenSequenceNumberByArbitratorTable; delete rejectedSlotVector; - delete pendingTransactionQueue; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } delete pendingSendArbitrationEntriesToDelete; delete transactionPartsSent; delete outstandingTransactionStatus; @@ -241,7 +256,12 @@ Table::~Table() { delete offlineTransactionsCommittedAndAtServer; delete localCommunicationTable; delete lastTransactionSeenFromMachineFromServer; - delete pendingSendArbitrationRounds; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } if (lastTransactionPartsSent != NULL) delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; @@ -461,6 +481,8 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; pendingTransactionBuilder = new PendingTransaction(localMachineId); } @@ -503,9 +525,12 @@ TransactionStatus *Table::commitTransaction() { pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; updateLiveStateFromLocal(); } - + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); try { @@ -535,6 +560,7 @@ TransactionStatus *Table::commitTransaction() { if (sendReturn.getSecond()) { // did arbitrate + delete transaction; oldindex--; } } @@ -722,7 +748,7 @@ bool Table::sendToServer(NewKey *newKey) { } // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data @@ -785,7 +811,8 @@ bool Table::sendToServer(NewKey *newKey) { //Add part back in pendingSendArbitrationRounds->set(oldcount++, pendingSendArbitrationRounds->get(i)); - } + } else + delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); @@ -1437,10 +1464,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi s->addEntry(liveentry); else { skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; goto donesearch; + } } } + delete liveentries; } donesearch: ; @@ -1502,10 +1532,11 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // must have a last message message-> If not then the server is // hiding slots if (!machineSet->isEmpty()) { + delete machineSet; throw new Error("Missing record for machines: "); } } - + delete machineSet; // Update the size of our local block chain-> commitNewMaxSize(); @@ -1656,14 +1687,14 @@ void Table::processNewTransactionParts() { if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } - - // Add that part to the transaction - transaction->addPartDecode(part); } delete ptit; } @@ -1724,14 +1755,12 @@ void Table::arbitrateFromServer() { continue; } - if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } - // update the largest transaction seen by arbitrator from server if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); @@ -1744,7 +1773,6 @@ void Table::arbitrateFromServer() { if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true - // Update the local changes so we can make the commit SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { @@ -1941,7 +1969,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating + * Compacts the arbitration data by merging commits and aggregating * aborts so that a single large push of commits can be done instead * of many small updates */ @@ -1996,6 +2024,16 @@ bool Table::compactArbitrationData() { } // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } lastRound->setCommit(newCommit); lastRound->addAborts(round->getAborts()); gotNewCommit = true; @@ -2007,15 +2045,11 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds->size()) { - pendingSendArbitrationRounds->clear(); - } else { - for (uint i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); - } + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - // Add the new compacted into the pending to send list pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor @@ -2154,7 +2188,6 @@ bool Table::updateCommittedTable() { // We have already seen this commit before so need to do the // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); @@ -2163,7 +2196,6 @@ bool Table::updateCommittedTable() { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } - continue; } @@ -2209,6 +2241,7 @@ bool Table::updateCommittedTable() { } } delete commitit; + delete commitsToEdit; // Update the last seen sequence number from this arbitrator if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { @@ -2227,6 +2260,11 @@ bool Table::updateCommittedTable() { SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); + printf("Commited KeyValue Table update for %p\n", this); + kv->getKey()->print(); + printf("\n"); + kv->getValue()->print(); + printf("\n"); committedKeyValueTable->put(kv->getKey(), kv); liveCommitsByKeyTable->put(kv->getKey(), commit); } @@ -2575,6 +2613,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); + delete deviceWatchSet; } else { // We need to watch this rejected message entry->setWatchSet(deviceWatchSet); @@ -2685,7 +2724,8 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } @@ -2698,7 +2738,7 @@ void Table::processEntry(CommitPart *entry) { } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); + CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2732,7 +2772,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven rmit->remove(); // Decrement machines that need to see this notification rm->removeWatcher(machineId); - delete rm; } } delete rmit; diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index d316aa1..80e70cd 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -26,6 +26,12 @@ Transaction::Transaction() : hadServerFailure(false) { } +Transaction::~Transaction() { + delete parts; + delete keyValueGuardSet; + delete keyValueUpdateSet; +} + void Transaction::addPartEncode(TransactionPart *newPart) { TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart); if (old == NULL) diff --git a/version2/src/C/Transaction.h b/version2/src/C/Transaction.h index 08eebea..57ce43a 100644 --- a/version2/src/C/Transaction.h +++ b/version2/src/C/Transaction.h @@ -28,6 +28,7 @@ private: public: Transaction(); + ~Transaction(); void addPartEncode(TransactionPart *newPart); void addPartDecode(TransactionPart *newPart); void addUpdateKV(KeyValue *kv); diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index d80a102..a3a5a7a 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -11,7 +11,7 @@ import java.util.ArrayList; public class Test { - public static final int NUMBER_OF_TESTS = 1000; + public static final int NUMBER_OF_TESTS = 2; public static void main(String[] args) throws ServerException { if (args[0].equals("2")) { -- 2.34.1