edits
authorbdemsky <bdemsky@uci.edu>
Thu, 15 Mar 2018 11:25:20 +0000 (04:25 -0700)
committerbdemsky <bdemsky@uci.edu>
Thu, 15 Mar 2018 11:25:20 +0000 (04:25 -0700)
15 files changed:
version2/src/C/ArbitrationRound.cc
version2/src/C/CloudComm.cc
version2/src/C/Commit.cc
version2/src/C/Commit.h
version2/src/C/CommitPart.cc
version2/src/C/CommitPart.h
version2/src/C/RejectedMessage.cc
version2/src/C/RejectedMessage.h
version2/src/C/Slot.cc
version2/src/C/Slot.h
version2/src/C/SlotBuffer.cc
version2/src/C/Table.cc
version2/src/C/Transaction.cc
version2/src/C/Transaction.h
version2/src/java/iotcloud/Test.java

index f5a1521..543c2d0 100644 (file)
@@ -20,6 +20,8 @@ ArbitrationRound::ArbitrationRound(Commit *_commit, Hashset<Abort *> *_abortsBef
 ArbitrationRound::~ArbitrationRound() {
        delete abortsBefore;
        delete parts;
+       if (commit != NULL)
+               delete commit;
 }
 
 void ArbitrationRound::generateParts() {
index 2a7d233..c229259 100644 (file)
@@ -64,9 +64,16 @@ CloudComm::CloudComm(Table *_table,  IoTString *_baseurl, IoTString *_password,
 }
 
 CloudComm::~CloudComm() {
-       delete random;
        delete getslot;
        delete putslot;
+       if (salt)
+               delete salt;
+       if (password)
+               delete password;
+       if (random)
+               delete random;
+       if (baseurl)
+               delete baseurl;
 }
 
 /**
@@ -437,12 +444,18 @@ bool CloudComm::getSalt() {
        try {
                timer->startTime();
                wc = openURL(urlstr);
+               delete urlstr;
+               urlstr = NULL;
                closeURLReq(&wc);
                timer->endTime();
        } catch (SocketTimeoutException *e) {
+               if (urlstr)
+                       delete urlstr;
                timer->endTime();
                throw new ServerException("getSalt failed", ServerException_TypeConnectTimeout);
        } catch (Exception *e) {
+               if (urlstr)
+                       delete urlstr;
                throw new Error("getSlot failed");
        }
 
@@ -559,30 +572,38 @@ Array<Slot *> *CloudComm::putSlot(Slot *slot, int max) {
                throw new Error("putSlot failed");
        }
 
+       Array<char> *resptype = NULL;
        try {
                int respcode = getResponseCode(&wc);
                readHeaders(&wc);
                timer->startTime();
-               Array<char> *resptype = new Array<char>(7);
+               resptype = new Array<char>(7);
                readURLData(&wc, resptype);
                timer->endTime();
 
                if (resptype->equals(getslot)) {
+                       delete resptype;
                        Array<Slot *> *tmp = processSlots(&wc);
                        close(wc.fd);
                        return tmp;
                } else if (resptype->equals(putslot)) {
+                       delete resptype;
                        close(wc.fd);
                        return NULL;
                } else {
+                       delete resptype;
                        close(wc.fd);
                        throw new Error("Bad response to putslot");
                }
        } catch (SocketTimeoutException *e) {
+               if (resptype != NULL)
+                       delete resptype;
                timer->endTime();
                close(wc.fd);
                throw new ServerException("putSlot failed", ServerException_TypeInputTimeout);
        } catch (Exception *e) {
+               if (resptype != NULL)
+                       delete resptype;
                throw new Error("putSlot failed");
        }
 }
@@ -627,6 +648,7 @@ Array<Slot *> *CloudComm::getSlots(int64_t sequencenumber) {
                if (!resptype->equals(getslot))
                        throw new Error("Bad Response: ");
 
+               delete resptype;
                Array<Slot *> *tmp = processSlots(&wc);
                close(wc.fd);
                return tmp;
@@ -658,6 +680,7 @@ Array<Slot *> *CloudComm::processSlots(WebConnection *wc) {
                slots->set(i, Slot_decode(table, data, mac));
                delete data;
        }
+       delete sizesofslots;
        return slots;
 }
 
index 5ed8c89..6c46fed 100644 (file)
@@ -31,6 +31,14 @@ Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transaction
        liveKeys(new Hashset<IoTString *>) {
 }
 
+Commit::~Commit() {
+       delete parts;
+       delete keyValueUpdateSet;
+       delete liveKeys;
+       if (missingParts != NULL)
+               delete missingParts;
+}
+
 void Commit::addPartDecode(CommitPart *newPart) {
        if (isDead) {
                // If dead then just kill this part and move on
index 1c48250..df263ae 100644 (file)
@@ -22,6 +22,7 @@ private:
 public:
        Commit();
        Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber);
+       ~Commit();
        void addPartDecode(CommitPart *newPart);
        int64_t getSequenceNumber();
        int64_t getTransactionSequenceNumber();
index 2e08c4b..a23f5de 100644 (file)
@@ -36,8 +36,8 @@ Array<char> *CommitPart::getData() {
        return data;
 }
 
-Pair<int64_t, int32_t> CommitPart::getPartId() {
-       return partId;
+Pair<int64_t, int32_t> CommitPart::getPartId() {
+       return partId;
 }
 
 Pair<int64_t, int64_t> CommitPart::getCommitId() {
index a569f1c..606f889 100644 (file)
@@ -27,7 +27,7 @@ public:
        int getPartNumber();
        int getDataSize();
        Array<char> *getData();
-       Pair<int64_t, int32_t> getPartId();
+       Pair<int64_t, int32_t> getPartId();
        Pair<int64_t, int64_t> getCommitId();
        bool isLastPart();
        int64_t getMachineId();
index da5d574..261f30c 100644 (file)
@@ -18,6 +18,11 @@ Entry *RejectedMessage_decode(Slot *slot, ByteBuffer *bb) {
        return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto == 1);
 }
 
+RejectedMessage::~RejectedMessage() {
+       if (watchset != NULL)
+               delete watchset;
+}
+
 void RejectedMessage::removeWatcher(int64_t machineid) {
        if (watchset->remove(machineid))
                if (watchset->isEmpty())
index c9d4d79..95b0dc4 100644 (file)
@@ -32,9 +32,11 @@ public:
                machineid(_machineid),
                oldseqnum(_oldseqnum),
                newseqnum(_newseqnum),
-               equalto(_equalto) {
+               equalto(_equalto),
+               watchset(NULL) {
        }
-
+       ~RejectedMessage();
+       
        int64_t getOldSeqNum() { return oldseqnum; }
        int64_t getNewSeqNum() { return newseqnum; }
        bool getEqual() { return equalto; }
index 03aa3cc..4a6f268 100644 (file)
@@ -46,6 +46,15 @@ Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSeq
        localSequenceNumber(_localSequenceNumber) {
 }
 
+Slot::~Slot() {
+       if (hmac != NULL)
+               delete hmac;
+       delete prevhmac;
+       for(uint i=0; i< entries->size(); i++)
+               delete entries->get(i);
+       delete entries;
+}
+
 Entry *Slot::addEntry(Entry *e) {
        e = e->getCopy(this);
        entries->add(e);
@@ -84,7 +93,8 @@ Slot *Slot_decode(Table *table, Array<char> *array, Mac *mac) {
        bb->get(prevhmac);
        if (!realmac->equals(hmac))
                throw new Error("Server Error: Invalid HMAC!  Potential Attack!");
-
+       delete realmac;
+       
        int64_t seqnum = bb->getLong();
        int64_t machineid = bb->getLong();
        int numentries = bb->getInt();
index d6e7575..56732a4 100644 (file)
@@ -36,7 +36,8 @@ public:
        Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber);
        Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber);
        Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
-
+       ~Slot();
+       
        Array<char> *getHMAC() { return hmac; }
        Array<char> *getPrevHMAC() { return prevhmac; }
        Entry *addEntry(Entry *e);
index aa217ac..af6b031 100644 (file)
@@ -14,6 +14,13 @@ SlotBuffer::SlotBuffer() :
 }
 
 SlotBuffer::~SlotBuffer() {
+       uint index = tail;
+       while (index != head) {
+               delete array->get(index);
+               index++;
+               if (index == array->length())
+                       index = 0;
+       }
        delete array;
 }
 
@@ -51,6 +58,7 @@ void SlotBuffer::incrementHead() {
 }
 
 void SlotBuffer::incrementTail() {
+       delete array->get(tail);
        tail++;
        if (((uint32_t)tail) >= array->length())
                tail = 0;
@@ -60,6 +68,13 @@ void SlotBuffer::putSlot(Slot *s) {
        int64_t checkNum = (getNewestSeqNum() + 1);
 
        if (checkNum != s->getSequenceNumber()) {
+               uint index = tail;
+               while (index != head) {
+                       delete array->get(index);
+                       index++;
+                       if (index == array->length())
+                               index = 0;
+               }
                oldestseqn = s->getSequenceNumber();
                tail = 0;
                head = 1;
index a566009..15dee76 100644 (file)
@@ -167,7 +167,16 @@ Table::~Table() {
        delete speculatedKeyValueTable;
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
-       delete lastMessageTable;
+       {
+               SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+               while (lmit->hasNext()) {
+                       Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+               }
+               delete lmit;
+               delete lastMessageTable;
+       }
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        {
                SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
                while(rmit->hasNext()) {
@@ -233,7 +242,13 @@ Table::~Table() {
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
-       delete pendingTransactionQueue;
+       {
+               uint size = pendingTransactionQueue->size();
+               for (uint iter = 0; iter < size; iter++) {
+                       delete pendingTransactionQueue->get(iter);
+               }
+               delete pendingTransactionQueue;
+       }
        delete pendingSendArbitrationEntriesToDelete;
        delete transactionPartsSent;
        delete outstandingTransactionStatus;
@@ -241,7 +256,12 @@ Table::~Table() {
        delete offlineTransactionsCommittedAndAtServer;
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
-       delete pendingSendArbitrationRounds;
+       {
+               for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
+                       delete pendingSendArbitrationRounds->get(i);
+               }
+               delete pendingSendArbitrationRounds;
+       }
        if (lastTransactionPartsSent != NULL)
                delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
@@ -461,6 +481,8 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
 
 void Table::startTransaction() {
        // Create a new transaction, invalidates any old pending transactions.
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
@@ -503,9 +525,12 @@ TransactionStatus *Table::commitTransaction() {
                pendingTransactionQueue->add(newTransaction);
        } else {
                arbitrateOnLocalTransaction(newTransaction);
+               delete newTransaction;
                updateLiveStateFromLocal();
        }
-
+       if (pendingTransactionBuilder != NULL)
+               delete pendingTransactionBuilder;
+       
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 
        try {
@@ -535,6 +560,7 @@ TransactionStatus *Table::commitTransaction() {
 
                                if (sendReturn.getSecond()) {
                                        // did arbitrate
+                                       delete transaction;
                                        oldindex--;
                                }
                        }
@@ -722,7 +748,7 @@ bool Table::sendToServer(NewKey *newKey) {
                        }
 
                        // Create the slot
-                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber);
+                       Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
                        localSequenceNumber++;
 
                        // Try to fill the slot with data
@@ -785,7 +811,8 @@ bool Table::sendToServer(NewKey *newKey) {
                                                //Add part back in
                                                pendingSendArbitrationRounds->set(oldcount++,
                                                                                                                                                                                        pendingSendArbitrationRounds->get(i));
-                                       }
+                                       } else
+                                               delete pendingSendArbitrationRounds->get(i);
                                }
                                pendingSendArbitrationRounds->setSize(oldcount);
 
@@ -1437,10 +1464,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi
                                s->addEntry(liveentry);
                        else {
                                skipcount++;
-                               if (skipcount > Table_SKIP_THRESHOLD)
+                               if (skipcount > Table_SKIP_THRESHOLD) {
+                                       delete liveentries;
                                        goto donesearch;
+                               }
                        }
                }
+               delete liveentries;
        }
 donesearch:
        ;
@@ -1502,10 +1532,11 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                // must have a last message message-> If not then the server is
                // hiding slots
                if (!machineSet->isEmpty()) {
+                       delete machineSet;
                        throw new Error("Missing record for machines: ");
                }
        }
-
+       delete machineSet;
        // Update the size of our local block chain->
        commitNewMaxSize();
 
@@ -1656,14 +1687,14 @@ void Table::processNewTransactionParts() {
                        if (transaction == NULL) {
                                // This is a new transaction that we dont have so make a new one
                                transaction = new Transaction();
+                               
+                               // Add that part to the transaction
+                               transaction->addPartDecode(part);
 
                                // Insert this new transaction into the live tables
                                liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
-                               liveTransactionByTransactionIdTable->put(new Pair<int64_t, int64_t>(part->getTransactionId()), transaction);
+                               liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
                        }
-
-                       // Add that part to the transaction
-                       transaction->addPartDecode(part);
                }
                delete ptit;
        }
@@ -1724,14 +1755,12 @@ void Table::arbitrateFromServer() {
                        continue;
                }
 
-
                if (!transaction->isComplete()) {
                        // Will arbitrate in incorrect order if we continue so just break
                        // Most likely this
                        break;
                }
 
-
                // update the largest transaction seen by arbitrator from server
                if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
                        lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
@@ -1744,7 +1773,6 @@ void Table::arbitrateFromServer() {
 
                if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
                        // Guard evaluated as true
-
                        // Update the local changes so we can make the commit
                        SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
                        while (kvit->hasNext()) {
@@ -1941,7 +1969,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
 }
 
 /**
- * Compacts the arbitration data my merging commits and aggregating
+ * Compacts the arbitration data by merging commits and aggregating
  * aborts so that a single large push of commits can be done instead
  * of many small updates
  */
@@ -1996,6 +2024,16 @@ bool Table::compactArbitrationData() {
                        }
 
                        // Set the new compacted part
+                       if (lastRound->getCommit() == newCommit)
+                               lastRound->setCommit(NULL);
+                       if (round->getCommit() == newCommit)
+                               round->setCommit(NULL);
+                       
+                       if (lastRound->getCommit() != NULL) {
+                               Commit * oldcommit = lastRound->getCommit();
+                               lastRound->setCommit(NULL);
+                               delete oldcommit;
+                       }
                        lastRound->setCommit(newCommit);
                        lastRound->addAborts(round->getAborts());
                        gotNewCommit = true;
@@ -2007,15 +2045,11 @@ bool Table::compactArbitrationData() {
        if (numberToDelete != 1) {
                // If there is a compaction
                // Delete the previous pieces that are now in the new compacted piece
-               if (numberToDelete == pendingSendArbitrationRounds->size()) {
-                       pendingSendArbitrationRounds->clear();
-               } else {
-                       for (uint i = 0; i < numberToDelete; i++) {
-                               pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1);
-                       }
+               for (uint i = 2; i <= numberToDelete; i++) {
+                       delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
                }
+               pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
 
-               // Add the new compacted into the pending to send list
                pendingSendArbitrationRounds->add(lastRound);
 
                // Should reinsert into the commit processor
@@ -2154,7 +2188,6 @@ bool Table::updateCommittedTable() {
                        // We have already seen this commit before so need to do the
                        // full processing on this commit
                        if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
-
                                // Update the last transaction that was updated if we can
                                if (commit->getTransactionSequenceNumber() != -1) {
                                        int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
@@ -2163,7 +2196,6 @@ bool Table::updateCommittedTable() {
                                                lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
                                        }
                                }
-
                                continue;
                        }
 
@@ -2209,6 +2241,7 @@ bool Table::updateCommittedTable() {
                                }
                        }
                        delete commitit;
+                       delete commitsToEdit;
 
                        // Update the last seen sequence number from this arbitrator
                        if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
@@ -2227,6 +2260,11 @@ bool Table::updateCommittedTable() {
                                SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
                                while (kvit->hasNext()) {
                                        KeyValue *kv = kvit->next();
+                                       printf("Commited KeyValue Table update for %p\n", this);
+                                       kv->getKey()->print();
+                                       printf("\n");
+                                       kv->getValue()->print();
+                                       printf("\n");
                                        committedKeyValueTable->put(kv->getKey(), kv);
                                        liveCommitsByKeyTable->put(kv->getKey(), commit);
                                }
@@ -2575,6 +2613,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
        if (deviceWatchSet->isEmpty()) {
                // This rejected message has been seen by all the clients so
                entry->setDead();
+               delete deviceWatchSet;
        } else {
                // We need to watch this rejected message
                entry->setWatchSet(deviceWatchSet);
@@ -2685,7 +2724,8 @@ void Table::processEntry(TransactionPart *entry) {
 void Table::processEntry(CommitPart *entry) {
        // Update the last transaction that was updated if we can
        if (entry->getTransactionSequenceNumber() != -1) {
-               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) {
+               if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
+                               lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
                        lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
                }
        }
@@ -2698,7 +2738,7 @@ void Table::processEntry(CommitPart *entry) {
        }
        // Update the part and set dead ones we have already seen (got a
        // rescued version)
-       CommitPart *previouslySeenPart = commitPart->put(new Pair<int64_t, int32_t>(entry->getPartId()), entry);
+       CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
        if (previouslySeenPart != NULL) {
                previouslySeenPart->setDead();
        }
@@ -2732,7 +2772,6 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
-                               delete rm;
                        }
                }
                delete rmit;
index d316aa1..80e70cd 100644 (file)
@@ -26,6 +26,12 @@ Transaction::Transaction() :
        hadServerFailure(false) {
 }
 
+Transaction::~Transaction() {
+       delete parts;
+       delete keyValueGuardSet;
+       delete keyValueUpdateSet;
+}
+
 void Transaction::addPartEncode(TransactionPart *newPart) {
        TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
        if (old == NULL)
index 08eebea..57ce43a 100644 (file)
@@ -28,6 +28,7 @@ private:
 
 public:
        Transaction();
+       ~Transaction();
        void addPartEncode(TransactionPart *newPart);
        void addPartDecode(TransactionPart *newPart);
        void addUpdateKV(KeyValue *kv);
index d80a102..a3a5a7a 100644 (file)
@@ -11,7 +11,7 @@ import java.util.ArrayList;
 
 public class Test {
 
-    public static final  int NUMBER_OF_TESTS = 1000
+    public static final  int NUMBER_OF_TESTS = 2
 
     public static void main(String[] args)  throws ServerException {
         if (args[0].equals("2")) {