X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=1a33f76f1f1bc98f7e8c267e90e4ddc515655bc0;hp=7c3fde05215cfeb3b547f59d8064e38039b7ef51;hb=0173578905303681df8ea5f3c35b3ead109c8ba8;hpb=d28d6cb0b30fcb629eb66feb8506c7e76a3652f8 diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 7c3fde0..1a33f76 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -60,7 +60,6 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -123,7 +122,6 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : lastIsNewKey(false), lastNewSize(0), lastTransactionPartsSent(NULL), - lastPendingSendArbitrationEntriesToDelete(NULL), lastNewKey(NULL), committedKeyValueTable(NULL), speculatedKeyValueTable(NULL), @@ -167,29 +165,115 @@ Table::~Table() { delete speculatedKeyValueTable; delete pendingTransactionSpeculatedKeyValueTable; delete liveNewKeyTable; - delete lastMessageTable; - delete rejectedMessageWatchVectorTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + delete pair; + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + { + SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); + while(rmit->hasNext()) { + int64_t machineid = rmit->next(); + Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); + SetIterator * mit = rmset->iterator(); + while (mit->hasNext()) { + RejectedMessage * rm = mit->next(); + delete rm; + } + delete mit; + delete rmset; + } + delete rmit; + delete rejectedMessageWatchVectorTable; + } delete arbitratorTable; delete liveAbortTable; - delete newTransactionParts; - delete newCommitParts; + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + delete parts; + } + delete partsit; + delete newTransactionParts; + } + { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + delete parts; + } + delete partsit; + delete newCommitParts; + } delete lastArbitratedTransactionNumberByArbitratorTable; delete liveTransactionBySequenceNumberTable; delete liveTransactionByTransactionIdTable; - delete liveCommitsTable; + { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveit->currVal(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) { + int64_t id = clientit->next(); + delete commitForClientTable->get(id); + } + delete clientit; + } + + delete commitForClientTable; + } + delete liveit; + delete liveCommitsTable; + } delete liveCommitsByKeyTable; delete lastCommitSeenSequenceNumberByArbitratorTable; delete rejectedSlotVector; - delete pendingTransactionQueue; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } delete pendingSendArbitrationEntriesToDelete; - delete transactionPartsSent; + { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + delete transactionPartsSent; + } delete outstandingTransactionStatus; delete liveAbortsGeneratedByLocal; delete offlineTransactionsCommittedAndAtServer; delete localCommunicationTable; delete lastTransactionSeenFromMachineFromServer; - delete pendingSendArbitrationRounds; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; + if (lastNewKey) + delete lastNewKey; } /** @@ -245,7 +329,7 @@ void Table::initTable() { Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); localSequenceNumber++; TableStatus *status = new TableStatus(s, numberOfSlots); - s->addEntry(status); + s->addShallowEntry(status); Array *array = cloud->putSlot(s, numberOfSlots); if (array == NULL) { @@ -253,10 +337,15 @@ void Table::initTable() { array->set(0, s); // update local block chain validateAndUpdate(array, true); + delete array; } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete s; + delete array; } else { + delete s; + delete array; throw new Error("Error on initialization"); } } @@ -269,6 +358,7 @@ void Table::rebuild() { // Just pull the latest slots from the server Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + delete newslots; sendToServer(NULL); updateLiveTransactionsAndStatus(); } @@ -289,7 +379,7 @@ IoTString *Table::getCommitted(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -307,7 +397,7 @@ IoTString *Table::getSpeculative(IoTString *key) { } if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -328,7 +418,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -358,7 +448,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -369,6 +459,7 @@ bool Table::update() { try { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); + delete newSlots; sendToServer(NULL); updateLiveTransactionsAndStatus(); return true; @@ -401,11 +492,12 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; pendingTransactionBuilder = new PendingTransaction(localMachineId); } -void Table::addKV(IoTString *key, IoTString *value) { - +void Table::put(IoTString *key, IoTString *value) { // Make sure it is a valid key if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); @@ -418,7 +510,7 @@ void Table::addKV(IoTString *key, IoTString *value) { } // Add the key value to this transaction - KeyValue *kv = new KeyValue(key, value); + KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); pendingTransactionBuilder->addKV(kv); } @@ -444,9 +536,12 @@ TransactionStatus *Table::commitTransaction() { pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; updateLiveStateFromLocal(); } - + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); try { @@ -476,6 +571,7 @@ TransactionStatus *Table::commitTransaction() { if (sendReturn.getSecond()) { // did arbitrate + delete transaction; oldindex--; } } @@ -500,6 +596,36 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } +void Table::processTransactionList(bool handlePartial) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + delete transaction; + } else if (handlePartial) { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + } + delete trit; +} + NewKey * Table::handlePartialSend(NewKey * newKey) { //Didn't receive acknowledgement for last send //See if the server has received a newer slot @@ -511,97 +637,23 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *ventries = s->getEntries(); - uint vesize = ventries->size(); - for (uint vei = 0; vei < vesize; vei++) { - Entry *entry = ventries->get(vei); - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + processTransactionList(true); } } @@ -620,73 +672,16 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { // insert into the local block chain validateAndUpdate(newSlots, true); } - // continue; } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; newKey = NULL; } } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; + + processTransactionList(true); } else { SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { @@ -703,9 +698,22 @@ NewKey * Table::handlePartialSend(NewKey * newKey) { // insert into the local block chain validateAndUpdate(newSlots, true); } + delete newSlots; return newKey; } +void Table::clearSentParts() { + // Clear the sent data since we are trying again + pendingSendArbitrationEntriesToDelete->clear(); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + transactionPartsSent->clear(); +} + bool Table::sendToServer(NewKey *newKey) { if (hadPartialSendToServer) { newKey = handlePartialSend(newKey); @@ -720,18 +728,18 @@ bool Table::sendToServer(NewKey *newKey) { // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { + delete newKey; return false; } // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn.getFirst(); - int newSize = fillSlotsReturn.getSecond(); - bool insertedNewKey = fillSlotsReturn.getThird(); + int newSize = 0; + bool insertedNewKey = false; + bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); if (needsResize) { // Reset which transaction to send @@ -748,31 +756,34 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); - + clearSentParts(); + // We needed a resize so try again - fillSlot(slot, true, newKey); + fillSlot(slot, true, newKey, newSize, insertedNewKey); } - + if (lastSlotAttemptedToSend != NULL) + delete lastSlotAttemptedToSend; + lastSlotAttemptedToSend = slot; lastIsNewKey = (newKey != NULL); lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; + if (( newKey != lastNewKey) && (lastNewKey != NULL)) + delete lastNewKey; lastNewKey = newKey; + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; lastTransactionPartsSent = transactionPartsSent->clone(); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); Array * newSlots = NULL; bool wasInserted = false; bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; // Did insert into the block chain - if (insertedNewKey) { // This slot was what was inserted not a previous slot - // New Key was successfully inserted into the block chain so dont want to insert it again newKey = NULL; } @@ -785,34 +796,14 @@ bool Table::sendToServer(NewKey *newKey) { round->removeParts(pendingSendArbitrationEntriesToDelete); if (!round->isDoneSending()) { - // Sent all the parts + //Add part back in pendingSendArbitrationRounds->set(oldcount++, pendingSendArbitrationRounds->get(i)); - } + } else + delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); - - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(transactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; + processTransactionList(false); } else { // Reset which transaction to send SetIterator *> *trit = getKeyIterator(transactionPartsSent); @@ -829,15 +820,14 @@ bool Table::sendToServer(NewKey *newKey) { } // Clear the sent data in preparation for next send - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); if (newSlots->length() != 0) { // insert into the local block chain validateAndUpdate(newSlots, true); } + delete newSlots; } - } catch (ServerException *e) { if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures @@ -866,8 +856,7 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; } - pendingSendArbitrationEntriesToDelete->clear(); - transactionPartsSent->clear(); + clearSentParts(); throw e; } @@ -1147,6 +1136,40 @@ Array *Table::acceptDataFromLocal(Array *data) { return returnData; } +/** Checks whether a given slot was sent using new slots in + array. Returns true if sent and false otherwise. */ + +bool Table::checkSend(Array * array, Slot *checkSlot) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + return true; + } + } + + //Also need to see if other machines acknowledged our message + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { + return true; + } + } + } + } + //Not found + return false; +} /** Method tries to send slot to server. Returns status in tuple. isInserted returns whether last un-acked send (if any) was @@ -1169,39 +1192,8 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn } if (hadPartialSendToServer) { - *isInserted = false; - uint size = (*array)->length(); - for (uint i = 0; i < size; i++) { - Slot *s = (*array)->get(i); - if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - *isInserted = true; - break; - } - } + *isInserted = checkSend(*array, slot); - //Also need to see if other machines acknowledged our message - if (!(*isInserted)) { - for (uint i = 0; i < size; i++) { - Slot *s = (*array)->get(i); - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { - *isInserted = true; - goto done; - } - } - } - } - } - done: if (!(*isInserted)) { rejectedSlotVector->add(slot->getSequenceNumber()); } @@ -1216,10 +1208,10 @@ bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isIn } /** - * Returns false if a resize was needed + * Returns true if a resize was needed but not done. */ -ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { - int newSize = 0; +bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { + newSize = 0;//special value to indicate no resize if (liveSlotCount > bufferResizeThreshold) { resize = true;//Resize is forced } @@ -1227,14 +1219,14 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey if (resize) { newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); TableStatus *status = new TableStatus(slot, newSize); - slot->addEntry(status); + slot->addShallowEntry(status); } // Fill with rejected slots first before doing anything else doRejectedMessages(slot); // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); + ThreeTuple mandatoryRescueReturn = doMandatoryRescue(slot, resize); // Extract working variables bool needsResize = mandatoryRescueReturn.getFirst(); @@ -1242,22 +1234,21 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { - // We need to resize but we are not resizing so return false - return ThreeTuple(true, NULL, NULL); + // We need to resize but we are not resizing so return true to force on retry + return true; } - bool inserted = false; + insertedKey = false; if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { slot->addEntry(newKeyEntry); - inserted = true; + insertedKey = true; } } // Clear the transactions, aborts and commits that were sent previously - transactionPartsSent->clear(); - pendingSendArbitrationEntriesToDelete->clear(); + clearSentParts(); uint size = pendingSendArbitrationRounds->size(); for (uint i = 0; i < size; i++) { ArbitrationRound *round = pendingSendArbitrationRounds->get(i); @@ -1323,7 +1314,7 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - return ThreeTuple(false, newSize, inserted); + return false; } void Table::doRejectedMessages(Slot *s) { @@ -1336,7 +1327,7 @@ void Table::doRejectedMessages(Slot *s) { if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } else { int64_t prev_seqn = -1; uint i = 0; @@ -1351,7 +1342,7 @@ void Table::doRejectedMessages(Slot *s) { /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s->addEntry(rm); + s->addShallowEntry(rm); } /* Generate rejected message entries for present messages */ for (; i < rejectedSlotVector->size(); i++) { @@ -1359,13 +1350,13 @@ void Table::doRejectedMessages(Slot *s) { Slot *s_msg = buffer->getSlot(curr_seqn); int64_t machineid = s_msg->getMachineID(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s->addEntry(rm); + s->addShallowEntry(rm); } } } } -ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize) { +ThreeTuple Table::doMandatoryRescue(Slot *slot, bool resize) { int64_t newestSequenceNumber = buffer->getNewestSeqNum(); int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { @@ -1437,10 +1428,13 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi s->addEntry(liveentry); else { skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; goto donesearch; + } } } + delete liveentries; } donesearch: ; @@ -1488,7 +1482,8 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal updateExpectedSize(); } } - + delete indexer; + // If there is a gap, check to see if the server sent us // everything-> if (firstSeqNum != (sequenceNumber + 1)) { @@ -1501,10 +1496,11 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // must have a last message message-> If not then the server is // hiding slots if (!machineSet->isEmpty()) { + delete machineSet; throw new Error("Missing record for machines: "); } } - + delete machineSet; // Update the size of our local block chain-> commitNewMaxSize(); @@ -1655,25 +1651,33 @@ void Table::processNewTransactionParts() { if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } - - // Add that part to the transaction - transaction->addPartDecode(part); } delete ptit; } delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots - newTransactionParts->clear(); + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + newTransactionParts->clear(); + } } void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable->size() == 0) { // Nothing to arbitrate on so move on return; @@ -1715,14 +1719,12 @@ void Table::arbitrateFromServer() { continue; } - if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } - // update the largest transaction seen by arbitrator from server if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); @@ -1735,7 +1737,6 @@ void Table::arbitrateFromServer() { if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true - // Update the local changes so we can make the commit SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { @@ -1765,6 +1766,8 @@ void Table::arbitrateFromServer() { lastSeqNumArbOn = transactionSequenceNumber; } + delete transactionSequenceNumbers; + Commit *newCommit = NULL; // If there is something to commit @@ -1781,7 +1784,7 @@ void Table::arbitrateFromServer() { newCommit->addKV(kv); } delete spit; - + // create the commit parts newCommit->createCommitParts(); @@ -1795,6 +1798,7 @@ void Table::arbitrateFromServer() { processEntry(commitPart); } } + delete speculativeTableTmp; if ((newCommit != NULL) || (generatedAborts->size() > 0)) { ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); @@ -1811,6 +1815,8 @@ void Table::arbitrateFromServer() { } } } + } else { + delete generatedAborts; } } @@ -1848,7 +1854,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - newCommit->addKV(kv); + newCommit->addKV(kv->getCopy()); } delete kvit; @@ -1931,7 +1937,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating + * Compacts the arbitration data by merging commits and aggregating * aborts so that a single large push of commits can be done instead * of many small updates */ @@ -1950,6 +1956,7 @@ bool Table::compactArbitrationData() { bool gotNewCommit = false; uint numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds->size()) { ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); @@ -1981,11 +1988,23 @@ bool Table::compactArbitrationData() { newSize += round->getAbortsCount(); if (newSize > ArbitrationRound_MAX_PARTS) { - // Cant compact since it would be too large + // Can't compact since it would be too large + if (lastRound->getCommit() != newCommit && + round->getCommit() != newCommit) + delete newCommit; break; } - // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } lastRound->setCommit(newCommit); lastRound->addAborts(round->getAborts()); gotNewCommit = true; @@ -1997,15 +2016,11 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds->size()) { - pendingSendArbitrationRounds->clear(); - } else { - for (uint i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); - } + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - // Add the new compacted into the pending to send list pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor @@ -2022,7 +2037,6 @@ bool Table::compactArbitrationData() { * transactions */ bool Table::updateCommittedTable() { - if (newCommitParts->size() == 0) { // Nothing new to process return false; @@ -2063,6 +2077,7 @@ bool Table::updateCommittedTable() { commit->addPartDecode(part); } delete pairit; + delete parts; } delete partsit; @@ -2077,7 +2092,6 @@ bool Table::updateCommittedTable() { SetIterator *> *liveit = getKeyIterator(liveCommitsTable); while (liveit->hasNext()) { int64_t arbitratorId = liveit->next(); - // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); @@ -2102,7 +2116,6 @@ bool Table::updateCommittedTable() { for (uint i = 0; i < commitSequenceNumbers->size(); i++) { int64_t commitSequenceNumber = commitSequenceNumbers->get(i); Commit *commit = commitForClientTable->get(commitSequenceNumber); - // Special processing if a commit is not complete if (!commit->isComplete()) { if (i == (commitSequenceNumbers->size() - 1)) { @@ -2116,6 +2129,7 @@ bool Table::updateCommittedTable() { // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); + delete commit; continue; } } @@ -2143,7 +2157,6 @@ bool Table::updateCommittedTable() { // We have already seen this commit before so need to do the // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); @@ -2152,7 +2165,6 @@ bool Table::updateCommittedTable() { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } - continue; } @@ -2193,10 +2205,12 @@ bool Table::updateCommittedTable() { // if the commit is now dead then remove it if (!previousCommit->isLive()) { commitForClientTable->remove(previousCommit->getSequenceNumber()); + delete previousCommit; } } } delete commitit; + delete commitsToEdit; // Update the last seen sequence number from this arbitrator if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { @@ -2221,6 +2235,7 @@ bool Table::updateCommittedTable() { delete kvit; } } + delete commitSequenceNumbers; } delete liveit; @@ -2277,6 +2292,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds + delete transactionSequenceNumbersSorted; return false; // did not speculate } @@ -2315,6 +2331,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } } + delete transactionSequenceNumbersSorted; + if (didSkip) { // Since there was a skip we need to redo the speculation next time around lastTransactionSequenceNumberSpeculatedOn = -1; @@ -2384,13 +2402,15 @@ void Table::updateLiveTransactionsAndStatus() { Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) + && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { // Set dead the transaction transaction->setDead(); // Remove the transaction from the live table iter->remove(); liveTransactionByTransactionIdTable->remove(transaction->getId()); + delete transaction; } } delete iter; @@ -2404,8 +2424,8 @@ void Table::updateLiveTransactionsAndStatus() { TransactionStatus *status = outstandingTransactionStatus->get(key); // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) + && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { // Set committed status->setStatus(TransactionStatus_StatusCommitted); @@ -2561,6 +2581,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); + delete deviceWatchSet; } else { // We need to watch this rejected message entry->setWatchSet(deviceWatchSet); @@ -2659,7 +2680,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); + TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2671,7 +2692,8 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } @@ -2684,7 +2706,7 @@ void Table::processEntry(CommitPart *entry) { } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); + CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); }