edits
[iotcloud.git] / version2 / src / C / Table.cc
index 88f43ee75ddef8c3feb817c91074c07fbcec71ec..a5660091fd614018bc10f7966e0b8cee66c83308 100644 (file)
@@ -168,15 +168,68 @@ Table::~Table() {
        delete pendingTransactionSpeculatedKeyValueTable;
        delete liveNewKeyTable;
        delete lastMessageTable;
-       delete rejectedMessageWatchVectorTable;
+       {
+               SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
+               while(rmit->hasNext()) {
+                       int64_t machineid = rmit->next();
+                       Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
+                       SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
+                       while (mit->hasNext()) {
+                               RejectedMessage * rm = mit->next();
+                               delete rm;
+                       }
+                       delete mit;
+                       delete rmset;
+               }
+               delete rmit;
+               delete rejectedMessageWatchVectorTable;
+       }
        delete arbitratorTable;
        delete liveAbortTable;
-       delete newTransactionParts;
-       delete newCommitParts;
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               delete newTransactionParts;
+       }
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               delete newCommitParts;
+       }
        delete lastArbitratedTransactionNumberByArbitratorTable;
        delete liveTransactionBySequenceNumberTable;
        delete liveTransactionByTransactionIdTable;
-       delete liveCommitsTable;
+       {
+               SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
+               while (liveit->hasNext()) {
+                       int64_t arbitratorId = liveit->next();
+                       
+                       // Get all the commits for a specific arbitrator
+                       Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+                       {
+                               SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
+                               while (clientit->hasNext()) {
+                                       int64_t id = clientit->next();
+                                       delete commitForClientTable->get(id);
+                               }
+                               delete clientit;
+                       }
+                       
+                       delete commitForClientTable;
+               }
+               delete liveit;
+               delete liveCommitsTable;
+       }
        delete liveCommitsByKeyTable;
        delete lastCommitSeenSequenceNumberByArbitratorTable;
        delete rejectedSlotVector;
@@ -189,6 +242,8 @@ Table::~Table() {
        delete localCommunicationTable;
        delete lastTransactionSeenFromMachineFromServer;
        delete pendingSendArbitrationRounds;
+       if (lastTransactionPartsSent != NULL)
+               delete lastTransactionPartsSent;
        delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
 }
 
@@ -253,10 +308,13 @@ void Table::initTable() {
                array->set(0, s);
                // update local block chain
                validateAndUpdate(array, true);
+               delete array;
        } else if (array->length() == 1) {
                // in case we did push the slot BUT we failed to init it
                validateAndUpdate(array, true);
+               delete array;
        } else {
+               delete array;
                throw new Error("Error on initialization");
        }
 }
@@ -269,6 +327,7 @@ void Table::rebuild() {
        // Just pull the latest slots from the server
        Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
        validateAndUpdate(newslots, true);
+       delete newslots;
        sendToServer(NULL);
        updateLiveTransactionsAndStatus();
 }
@@ -289,7 +348,7 @@ IoTString *Table::getCommitted(IoTString *key)  {
        KeyValue *kv = committedKeyValueTable->get(key);
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -307,7 +366,7 @@ IoTString *Table::getSpeculative(IoTString *key) {
        }
 
        if (kv != NULL) {
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                return NULL;
        }
@@ -328,7 +387,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -358,7 +417,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) {
 
        if (kv != NULL) {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
-               return kv->getValue();
+               return new IoTString(kv->getValue());
        } else {
                pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
                return NULL;
@@ -369,6 +428,7 @@ bool Table::update()  {
        try {
                Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
                validateAndUpdate(newSlots, false);
+               delete newSlots;
                sendToServer(NULL);
                updateLiveTransactionsAndStatus();
                return true;
@@ -404,8 +464,7 @@ void Table::startTransaction() {
        pendingTransactionBuilder = new PendingTransaction(localMachineId);
 }
 
-void Table::addKV(IoTString *key, IoTString *value) {
-
+void Table::put(IoTString *key, IoTString *value) {
        // Make sure it is a valid key
        if (!arbitratorTable->contains(key)) {
                throw new Error("Key not Found.");
@@ -418,7 +477,7 @@ void Table::addKV(IoTString *key, IoTString *value) {
        }
 
        // Add the key value to this transaction
-       KeyValue *kv = new KeyValue(key, value);
+       KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
        pendingTransactionBuilder->addKV(kv);
 }
 
@@ -590,7 +649,6 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                        // insert into the local block chain
                        validateAndUpdate(newSlots, true);
                }
-               // continue;
        } else {
                if (checkSend(newSlots, lastSlotAttemptedToSend)) {
                        if (newKey != NULL) {
@@ -642,6 +700,7 @@ NewKey * Table::handlePartialSend(NewKey * newKey) {
                // insert into the local block chain
                validateAndUpdate(newSlots, true);
        }
+       delete newSlots;
        return newKey;
 }
 
@@ -698,6 +757,8 @@ bool Table::sendToServer(NewKey *newKey) {
                        lastInsertedNewKey = insertedNewKey;
                        lastNewSize = newSize;
                        lastNewKey = newKey;
+                       if (lastTransactionPartsSent != NULL)
+                               delete lastTransactionPartsSent;
                        lastTransactionPartsSent = transactionPartsSent->clone();
                        lastPendingSendArbitrationEntriesToDelete = new Vector<Entry *>(pendingSendArbitrationEntriesToDelete);
 
@@ -772,6 +833,7 @@ bool Table::sendToServer(NewKey *newKey) {
                                // insert into the local block chain
                                validateAndUpdate(newSlots, true);
                        }
+                       delete newSlots;
                }
        } catch (ServerException *e) {
                if (e->getType() != ServerException_TypeInputTimeout) {
@@ -1426,7 +1488,8 @@ void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal
                        updateExpectedSize();
                }
        }
-
+       delete indexer;
+       
        // If there is a gap, check to see if the server sent us
        // everything->
        if (firstSeqNum != (sequenceNumber + 1)) {
@@ -1607,7 +1670,16 @@ void Table::processNewTransactionParts() {
        delete tpit;
        // Clear all the new transaction parts in preparation for the next
        // time the server sends slots
-       newTransactionParts->clear();
+       {
+               SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+               while (partsit->hasNext()) {
+                       int64_t machineId = partsit->next();
+                       Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+                       delete parts;
+               }
+               delete partsit;
+               newTransactionParts->clear();
+       }
 }
 
 void Table::arbitrateFromServer() {
@@ -1718,7 +1790,7 @@ void Table::arbitrateFromServer() {
                        newCommit->addKV(kv);
                }
                delete spit;
-
+               
                // create the commit parts
                newCommit->createCommitParts();
 
@@ -1732,6 +1804,7 @@ void Table::arbitrateFromServer() {
                        processEntry(commitPart);
                }
        }
+       delete speculativeTableTmp;
 
        if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
                ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
@@ -1999,6 +2072,7 @@ bool Table::updateCommittedTable() {
                        commit->addPartDecode(part);
                }
                delete pairit;
+               delete parts;
        }
        delete partsit;
 
@@ -2052,6 +2126,7 @@ bool Table::updateCommittedTable() {
                                        // Delete it and move on
                                        commit->setDead();
                                        commitForClientTable->remove(commit->getSequenceNumber());
+                                       delete commit;
                                        continue;
                                }
                        }
@@ -2129,6 +2204,7 @@ bool Table::updateCommittedTable() {
                                        // if the commit is now dead then remove it
                                        if (!previousCommit->isLive()) {
                                                commitForClientTable->remove(previousCommit->getSequenceNumber());
+                                               delete previousCommit;
                                        }
                                }
                        }
@@ -2328,6 +2404,7 @@ void Table::updateLiveTransactionsAndStatus() {
                                // Remove the transaction from the live table
                                iter->remove();
                                liveTransactionByTransactionIdTable->remove(transaction->getId());
+                               delete transaction;
                        }
                }
                delete iter;
@@ -2655,6 +2732,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven
                                rmit->remove();
                                // Decrement machines that need to see this notification
                                rm->removeWatcher(machineId);
+                               delete rm;
                        }
                }
                delete rmit;