X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=531337cc5336f1c69dac6aed68018b2899355c98;hp=1bdc259945f940e1ec05bd43c9a482c597de0816;hb=b7ed1849727b50e226f3b9d1c432d3071d739368;hpb=9c3fa5cbce287df14626d262bd0179e994338869 diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 1bdc259..531337c 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -10,7 +10,7 @@ #include "TransactionStatus.h" #include "Transaction.h" #include "LastMessage.h" -#include "Random.h" +#include "SecureRandom.h" #include "ByteBuffer.h" #include "Abort.h" #include "CommitPart.h" @@ -21,9 +21,9 @@ #include "SlotIndexer.h" #include -int compareInt64(const void * a, const void *b) { - const int64_t * pa = (const int64_t *) a; - const int64_t * pb = (const int64_t *) b; +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; if (*pa < *pb) return -1; else if (*pa > *pb) @@ -46,6 +46,7 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -108,6 +109,7 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -156,22 +158,131 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : init(); } +Table::~Table() { + delete cloud; + delete random; + delete buffer; + // init data structs + delete committedKeyValueTable; + delete speculatedKeyValueTable; + delete pendingTransactionSpeculatedKeyValueTable; + delete liveNewKeyTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + { + SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); + while(rmit->hasNext()) { + int64_t machineid = rmit->next(); + Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); + SetIterator * mit = rmset->iterator(); + while (mit->hasNext()) { + RejectedMessage * rm = mit->next(); + delete rm; + } + delete mit; + delete rmset; + } + delete rmit; + delete rejectedMessageWatchVectorTable; + } + delete arbitratorTable; + delete liveAbortTable; + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + delete newTransactionParts; + } + { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + delete parts; + } + delete partsit; + delete newCommitParts; + } + delete lastArbitratedTransactionNumberByArbitratorTable; + delete liveTransactionBySequenceNumberTable; + delete liveTransactionByTransactionIdTable; + { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) { + int64_t id = clientit->next(); + delete commitForClientTable->get(id); + } + delete clientit; + } + + delete commitForClientTable; + } + delete liveit; + delete liveCommitsTable; + } + delete liveCommitsByKeyTable; + delete lastCommitSeenSequenceNumberByArbitratorTable; + delete rejectedSlotVector; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } + delete pendingSendArbitrationEntriesToDelete; + delete transactionPartsSent; + delete outstandingTransactionStatus; + delete liveAbortsGeneratedByLocal; + delete offlineTransactionsCommittedAndAtServer; + delete localCommunicationTable; + delete lastTransactionSeenFromMachineFromServer; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; + delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; +} + /** * Init all the stuff needed for for table usage */ void Table::init() { // Init helper objects - random = new Random(); + random = new SecureRandom(); buffer = new SlotBuffer(); // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); lastMessageTable = new Hashtable * >(); rejectedMessageWatchVectorTable = new Hashtable * >(); - arbitratorTable = new Hashtable(); + arbitratorTable = new Hashtable(); liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); @@ -179,7 +290,7 @@ void Table::init() { liveTransactionBySequenceNumberTable = new Hashtable(); liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); liveCommitsTable = new Hashtable * >(); - liveCommitsByKeyTable = new Hashtable(); + liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); pendingTransactionQueue = new Vector(); @@ -217,10 +328,13 @@ void Table::initTable() { array->set(0, s); // update local block chain validateAndUpdate(array, true); + delete array; } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete array; } else { + delete array; throw new Error("Error on initialization"); } } @@ -233,6 +347,7 @@ void Table::rebuild() { // Just pull the latest slots from the server Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + delete newslots; sendToServer(NULL); updateLiveTransactionsAndStatus(); } @@ -246,14 +361,14 @@ int64_t Table::getArbitrator(IoTString *key) { } void Table::close() { - cloud->close(); + cloud->closeCloud(); } IoTString *Table::getCommitted(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -271,7 +386,7 @@ IoTString *Table::getSpeculative(IoTString *key) { } if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -292,7 +407,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -322,7 +437,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -333,6 +448,7 @@ bool Table::update() { try { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); + delete newSlots; sendToServer(NULL); updateLiveTransactionsAndStatus(); return true; @@ -350,7 +466,7 @@ bool Table::update() { bool Table::createNewKey(IoTString *keyName, int64_t machineId) { while (true) { - if (!arbitratorTable->contains(keyName)) { + if (arbitratorTable->contains(keyName)) { // There is already an arbitrator return false; } @@ -365,11 +481,12 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; pendingTransactionBuilder = new PendingTransaction(localMachineId); } -void Table::addKV(IoTString *key, IoTString *value) { - +void Table::put(IoTString *key, IoTString *value) { // Make sure it is a valid key if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); @@ -382,7 +499,7 @@ void Table::addKV(IoTString *key, IoTString *value) { } // Add the key value to this transaction - KeyValue *kv = new KeyValue(key, value); + KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); pendingTransactionBuilder->addKV(kv); } @@ -408,9 +525,12 @@ TransactionStatus *Table::commitTransaction() { pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; updateLiveStateFromLocal(); } - + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); try { @@ -420,7 +540,7 @@ TransactionStatus *Table::commitTransaction() { Hashset *arbitratorTriedAndFailed = new Hashset(); uint size = pendingTransactionQueue->size(); uint oldindex = 0; - for (int iter = 0; iter < size; iter++) { + for (uint iter = 0; iter < size; iter++) { Transaction *transaction = pendingTransactionQueue->get(iter); pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); @@ -440,6 +560,7 @@ TransactionStatus *Table::commitTransaction() { if (sendReturn.getSecond()) { // did arbitrate + delete transaction; oldindex--; } } @@ -464,242 +585,176 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } -bool Table::sendToServer(NewKey *newKey) { - bool fromRetry = false; - try { - if (hadPartialSendToServer) { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); - if (newSlots->length() == 0) { - fromRetry = true; - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - - if (sendSlotsReturn.getFirst()) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; - } else { - newSlots = sendSlotsReturn.getThird(); - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *ventries = s->getEntries(); - uint vesize = ventries->size(); - for (uint vei = 0; vei < vesize; vei++) { - Entry *entry = ventries->get(vei); - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; +NewKey * Table::handlePartialSend(NewKey * newKey) { + //Didn't receive acknowledgement for last send + //See if the server has received a newer slot + + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + if (newSlots->length() == 0) { + //Retry sending old slot + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); + + if (sendSlotsReturn) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } + } + delete trit; + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; } } - + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - - if (sendSlotsReturn.getThird()->length() != 0) { - // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); - } - // continue; - } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; - } else { - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } else { transaction->resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction->didSendAPartToServer()) { transaction->setSequenceNumber(-1); } } - delete trit; } - - // insert into the local block chain - validateAndUpdate(newSlots, true); + delete trit; } } - } catch (ServerException *e) { - throw e; + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + + if (newSlots->length() != 0) { + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } else { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + } + delete trit; + } else { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + } + + // insert into the local block chain + validateAndUpdate(newSlots, true); } + delete newSlots; + return newKey; +} - +bool Table::sendToServer(NewKey *newKey) { + if (hadPartialSendToServer) { + newKey = handlePartialSend(newKey); + } try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { - - fromRetry = false; - if (hadPartialSendToServer) { throw new Error("Should Be error free"); } - - - + // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { return false; } // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn.getFirst(); - int newSize = fillSlotsReturn.getSecond(); - bool insertedNewKey = fillSlotsReturn.getThird(); + int newSize = 0; + bool insertedNewKey = false; + bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); if (needsResize) { // Reset which transaction to send @@ -720,7 +775,7 @@ bool Table::sendToServer(NewKey *newKey) { transactionPartsSent->clear(); // We needed a resize so try again - fillSlot(slot, true, newKey); + fillSlot(slot, true, newKey, newSize, insertedNewKey); } lastSlotAttemptedToSend = slot; @@ -728,18 +783,19 @@ bool Table::sendToServer(NewKey *newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; lastTransactionPartsSent = transactionPartsSent->clone(); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - - if (sendSlotsReturn.getFirst()) { + Array * newSlots = NULL; + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); + if (sendSlotsReturn) { // Did insert into the block chain - if (insertedNewKey) { // This slot was what was inserted not a previous slot - // New Key was successfully inserted into the block chain so dont want to insert it again newKey = NULL; } @@ -752,10 +808,11 @@ bool Table::sendToServer(NewKey *newKey) { round->removeParts(pendingSendArbitrationEntriesToDelete); if (!round->isDoneSending()) { - // Sent all the parts + //Add part back in pendingSendArbitrationRounds->set(oldcount++, pendingSendArbitrationRounds->get(i)); - } + } else + delete pendingSendArbitrationRounds->get(i); } pendingSendArbitrationRounds->setSize(oldcount); @@ -799,12 +856,12 @@ bool Table::sendToServer(NewKey *newKey) { pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); - if (sendSlotsReturn.getThird()->length() != 0) { + if (newSlots->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); + validateAndUpdate(newSlots, true); } + delete newSlots; } - } catch (ServerException *e) { if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures @@ -846,7 +903,7 @@ bool Table::updateFromLocal(int64_t machineId) { if (!localCommunicationTable->contains(machineId)) return false; - Pair * localCommunicationInformation = localCommunicationTable->get(machineId); + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); @@ -897,16 +954,16 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications if (!localCommunicationTable->contains(transaction->getArbitrator())) return Pair(true, false); - - Pair * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); { - Vector * tParts = transaction->getParts(); + Vector *tParts = transaction->getParts(); uint tPartsSize = tParts->size(); for (uint i = 0; i < tPartsSize; i++) { - TransactionPart * part = tParts->get(i); + TransactionPart *part = tParts->get(i); sendDataSize += part->getSize(); } } @@ -924,10 +981,10 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); { - Vector * tParts = transaction->getParts(); + Vector *tParts = transaction->getParts(); uint tPartsSize = tParts->size(); for (uint i = 0; i < tPartsSize; i++) { - TransactionPart * part = tParts->get(i); + TransactionPart *part = tParts->get(i); part->encode(bbEncode); } } @@ -967,14 +1024,14 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (didCommit) { status->setStatus(TransactionStatus_StatusCommitted); } else { status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (foundAbort) { status->setStatus(TransactionStatus_StatusAborted); } else { @@ -1026,20 +1083,20 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *abortLocalSequenceNumbers = new Vector(); { SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); - while(abortit->hasNext()) + while (abortit->hasNext()) abortLocalSequenceNumbers->add(abortit->next()); delete abortit; } - + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint asize = abortLocalSequenceNumbers->size(); - for(uint i=0; iget(i); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - + Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); unseenArbitrations->add(abort); returnDataSize += abort->getSize(); @@ -1051,14 +1108,14 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *commitLocalSequenceNumbers = new Vector(); { SetIterator *commitit = getKeyIterator(commitForClientTable); - while(commitit->hasNext()) + while (commitit->hasNext()) commitLocalSequenceNumbers->add(commitit->next()); delete commitit; } qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); uint clsSize = commitLocalSequenceNumbers->size(); - for(uint clsi = 0; clsi < clsSize; clsi++) { + for (uint clsi = 0; clsi < clsSize; clsi++) { int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); Commit *commit = commitForClientTable->get(localSequenceNumber); @@ -1067,10 +1124,10 @@ Array *Table::acceptDataFromLocal(Array *data) { } { - Vector * parts = commit->getParts(); + Vector *parts = commit->getParts(); uint nParts = parts->size(); - for(uint i=0; iget(i); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); unseenArbitrations->add(commitPart); returnDataSize += commitPart->getSize(); } @@ -1114,80 +1171,82 @@ Array *Table::acceptDataFromLocal(Array *data) { return returnData; } -ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { - bool attemptedToSendToServerTmp = attemptedToSendToServer; - attemptedToSendToServer = true; - - bool inserted = false; - bool lastTryInserted = false; +/** Checks whether a given slot was sent using new slots in + array. Returns true if sent and false otherwise. */ - Array *array = cloud->putSlot(slot, newSize); - if (array == NULL) { - array = new Array(); - array->set(0, slot); - rejectedSlotVector->clear(); - inserted = true; - } else { - if (array->length() == 0) { - throw new Error("Server Error: Did not send any slots"); +bool Table::checkSend(Array * array, Slot *checkSlot) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + return true; } - - // if (attemptedToSendToServerTmp) { - if (hadPartialSendToServer) { - - bool isInserted = false; - uint size = array->length(); - for (uint i = 0; i < size; i++) { - Slot *s = array->get(i); - if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; + } + + //Also need to see if other machines acknowledged our message + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { + return true; } } + } + } + //Not found + return false; +} - for (uint i = 0; i < size; i++) { - Slot *s = array->get(i); - if (isInserted) { - break; - } +/** Method tries to send slot to server. Returns status in tuple. + isInserted returns whether last un-acked send (if any) was + successful. Returns whether send was confirmed.x + */ - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); +bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array **array) { + attemptedToSendToServer = true; - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; + *array = cloud->putSlot(slot, newSize); + if (*array == NULL) { + *array = new Array(1); + (*array)->set(0, slot); + rejectedSlotVector->clear(); + *isInserted = false; + return true; + } else { + if ((*array)->length() == 0) { + throw new Error("Server Error: Did not send any slots"); + } - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } + if (hadPartialSendToServer) { + *isInserted = checkSend(*array, slot); - if (!isInserted) { + if (!(*isInserted)) { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; - } else { - lastTryInserted = true; } + + return false; } else { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; + *isInserted = false; + return false; } } - - return ThreeTuple *>(inserted, lastTryInserted, array); } /** - * Returns false if a resize was needed + * Returns true if a resize was needed but not done. */ -ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { - int newSize = 0; +bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { + newSize = 0;//special value to indicate no resize if (liveSlotCount > bufferResizeThreshold) { resize = true;//Resize is forced } @@ -1210,16 +1269,16 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { - // We need to resize but we are not resizing so return false - return ThreeTuple(true, NULL, NULL); + // We need to resize but we are not resizing so return true to force on retry + return true; } - bool inserted = false; + insertedKey = false; if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { slot->addEntry(newKeyEntry); - inserted = true; + insertedKey = true; } } @@ -1291,7 +1350,7 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - return ThreeTuple(false, newSize, inserted); + return false; } void Table::doRejectedMessages(Slot *s) { @@ -1307,7 +1366,7 @@ void Table::doRejectedMessages(Slot *s) { s->addEntry(rm); } else { int64_t prev_seqn = -1; - int i = 0; + uint i = 0; /* Go through list of missing messages */ for (; i < rejectedSlotVector->size(); i++) { int64_t curr_seqn = rejectedSlotVector->get(i); @@ -1348,7 +1407,7 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot * previousSlot = buffer->getSlot(currentSequenceNumber); + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; @@ -1366,8 +1425,8 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Iterate over all the live entries and try to rescue them uint lESize = liveEntries->size(); - for (uint i=0; i< lESize; i++) { - Entry * liveEntry = liveEntries->get(i); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry slot->addEntry(liveEntry); @@ -1388,7 +1447,6 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer->getNewestSeqNum(); -search: for (; seqn <= newestseqnum; seqn++) { Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward @@ -1400,18 +1458,21 @@ search: seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); uint lESize = liveentries->size(); - for (uint i=0; i< lESize; i++) { - Entry * liveentry = liveentries->get(i); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; goto donesearch; + } } } + delete liveentries; } - donesearch: +donesearch: ; } @@ -1442,8 +1503,8 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Set to keep track of messages from clients Hashset *machineSet = new Hashset(); { - SetIterator *> * lmit=getKeyIterator(lastMessageTable); - while(lmit->hasNext()) + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) machineSet->add(lmit->next()); delete lmit; } @@ -1451,13 +1512,14 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Process each slots data { uint numSlots = newSlots->length(); - for(uint i=0; iget(i); processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); updateExpectedSize(); } } - + delete indexer; + // If there is a gap, check to see if the server sent us // everything-> if (firstSeqNum != (sequenceNumber + 1)) { @@ -1470,22 +1532,23 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // must have a last message message-> If not then the server is // hiding slots if (!machineSet->isEmpty()) { + delete machineSet; throw new Error("Missing record for machines: "); } } - + delete machineSet; // Update the size of our local block chain-> commitNewMaxSize(); // Commit new to slots to the local block chain-> { uint numSlots = newSlots->length(); - for(uint i=0; iget(i); - + // Insert this slot into our local block chain copy-> buffer->putSlot(slot); - + // Keep track of how many slots are currently live (have live data // in them)-> liveSlotCount++; @@ -1598,15 +1661,15 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts // for - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> * tpit= getKeyIterator(newTransactionParts); - while(tpit->hasNext()) { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { int64_t machineId = tpit->next(); Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - while(ptit->hasNext()) { - Pair * partId = ptit->next(); + while (ptit->hasNext()) { + Pair *partId = ptit->next(); TransactionPart *part = parts->get(partId); if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { @@ -1617,53 +1680,66 @@ void Table::processNewTransactionParts() { continue; } } - + // Get the transaction object for that sequence number Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } - - // Add that part to the transaction - transaction->addPartDecode(part); } delete ptit; } delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots - newTransactionParts->clear(); + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + newTransactionParts->clear(); + } } void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable->size() == 0) { // Nothing to arbitrate on so move on return; } // Get the transaction sequence numbers and sort from oldest to newest - Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); + Vector *transactionSequenceNumbers = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbers->add(trit->next()); + delete trit; + } qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are - Hashtable * speculativeTableTmp = new Hashtable(); + Hashtable *speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Hashset *generatedAborts = new Hashset(); - - for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { + uint tsnSize = transactionSequenceNumbers->size(); + for (uint i = 0; i < tsnSize; i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - // Check if this machine arbitrates for this transaction if not // then we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { @@ -1679,16 +1755,14 @@ void Table::arbitrateFromServer() { continue; } - if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } - // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) { + if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } else { int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); @@ -1699,7 +1773,6 @@ void Table::arbitrateFromServer() { if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true - // Update the local changes so we can make the commit SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { @@ -1707,7 +1780,7 @@ void Table::arbitrateFromServer() { speculativeTableTmp->put(kv->getKey(), kv); } delete kvit; - + // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; } else { @@ -1738,20 +1811,28 @@ void Table::arbitrateFromServer() { localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } - + delete spit; + // create the commit parts newCommit->createCommitParts(); // Append all the commit parts to the end of the pending queue // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } + delete speculativeTableTmp; if ((newCommit != NULL) || (generatedAborts->size() > 0)) { ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); @@ -1760,7 +1841,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1784,7 +1868,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return Pair(false, false); @@ -1805,7 +1889,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { newCommit->addKV(kv); } delete kvit; - + // create the commit parts newCommit->createCommitParts(); @@ -1816,12 +1900,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1839,7 +1929,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status // Guard evaluated was false so create abort - TransactionStatus * status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } @@ -1863,7 +1953,11 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1875,7 +1969,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } /** - * Compacts the arbitration data my merging commits and aggregating + * Compacts the arbitration data by merging commits and aggregating * aborts so that a single large push of commits can be done instead * of many small updates */ @@ -1893,7 +1987,8 @@ bool Table::compactArbitrationData() { bool hadCommit = (lastRound->getCommit() == NULL); bool gotNewCommit = false; - int numberToDelete = 1; + uint numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds->size()) { ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); @@ -1913,7 +2008,7 @@ bool Table::compactArbitrationData() { lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit * newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1925,11 +2020,23 @@ bool Table::compactArbitrationData() { newSize += round->getAbortsCount(); if (newSize > ArbitrationRound_MAX_PARTS) { - // Cant compact since it would be too large + // Can't compact since it would be too large + if (lastRound->getCommit() != newCommit && + round->getCommit() != newCommit) + delete newCommit; break; } - // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } lastRound->setCommit(newCommit); lastRound->addAborts(round->getAborts()); gotNewCommit = true; @@ -1941,15 +2048,11 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds->size()) { - pendingSendArbitrationRounds->clear(); - } else { - for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); - } + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - // Add the new compacted into the pending to send list pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor @@ -1966,18 +2069,21 @@ bool Table::compactArbitrationData() { * transactions */ bool Table::updateCommittedTable() { - if (newCommitParts->size() == 0) { // Nothing new to process return false; } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable *, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -2002,7 +2108,10 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; + delete parts; } + delete partsit; // Clear all the new commits parts in preparation for the next time // the server sends slots @@ -2012,26 +2121,33 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { - + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } // Go through each new commit one by one - for (int i = 0; i < commitSequenceNumbers->size(); i++) { + for (uint i = 0; i < commitSequenceNumbers->size(); i++) { int64_t commitSequenceNumber = commitSequenceNumbers->get(i); Commit *commit = commitForClientTable->get(commitSequenceNumber); - // Special processing if a commit is not complete if (!commit->isComplete()) { if (i == (commitSequenceNumbers->size() - 1)) { @@ -2045,16 +2161,15 @@ bool Table::updateCommittedTable() { // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); + delete commit; continue; } } // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -2074,17 +2189,14 @@ bool Table::updateCommittedTable() { // We have already seen this commit before so need to do the // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } - continue; } @@ -2094,40 +2206,46 @@ bool Table::updateCommittedTable() { // have live values for their keys Hashset *commitsToEdit = new Hashset(); { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); } delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { // Update which keys in the old commits are still live { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); previousCommit->invalidateKey(kv->getKey()); } delete kvit; } - + // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); + delete previousCommit; } } } + delete commitit; + delete commitsToEdit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2140,7 +2258,7 @@ bool Table::updateCommittedTable() { // Update the committed table of keys and which commit is using which key { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); committedKeyValueTable->put(kv->getKey(), kv); @@ -2150,6 +2268,7 @@ bool Table::updateCommittedTable() { } } } + delete liveit; return didProcessANewCommit; } @@ -2159,15 +2278,22 @@ bool Table::updateCommittedTable() { * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them // from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - qsort(transactionSequenceNumberSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2182,14 +2308,18 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + uint startIndex = 0; + + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds @@ -2199,7 +2329,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Hashset *incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; - for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { + for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); @@ -2259,14 +2389,18 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + uint startIndex = 0; + + for (; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; } - for (int i = startIndex; i < pendingTransactionQueue->size(); i++) { + for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) { Transaction *transaction = pendingTransactionQueue->get(i); lastPendingTransactionSpeculatedOn = transaction; @@ -2288,38 +2422,46 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) + && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + delete transaction; + } } + delete iter; } // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { - - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); + { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) + && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); - // Remove - iter->remove(); + // Remove + iter->remove(); + } } + delete iter; } } @@ -2334,8 +2476,8 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo // Process each entry in the slot Vector *entries = slot->getEntries(); uint eSize = entries->size(); - for(uint ei=0; ei < eSize; ei++) { - Entry * entry = entries->get(ei); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); switch (entry->getType()) { case TypeCommitPart: processEntry((CommitPart *)entry); @@ -2394,7 +2536,7 @@ void Table::processEntry(NewKey *entry) { * seen in this current round of updating the local copy of the block * chain */ -void Table::processEntry(TableStatus * entry, int64_t seq) { +void Table::processEntry(TableStatus *entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); @@ -2439,9 +2581,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // Create a list of clients to watch until they see this rejected // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2449,8 +2592,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair lastMessageValue = lastMessageEntry->getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this @@ -2461,9 +2604,12 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); + delete deviceWatchSet; } else { // We need to watch this rejected message entry->setWatchSet(deviceWatchSet); @@ -2485,7 +2631,8 @@ void Table::processEntry(Abort *entry) { // Abort has not been seen by the client it is for yet so we need to // keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } @@ -2494,10 +2641,11 @@ void Table::processEntry(Abort *entry) { liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(&entry->getAbortId()); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); @@ -2518,15 +2666,17 @@ void Table::processEntry(Abort *entry) { } // Set dead a transaction if we can - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } // Update the last transaction sequence number that the arbitrator // arbitrated on - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2541,8 +2691,7 @@ void Table::processEntry(Abort *entry) { void Table::processEntry(TransactionPart *entry) { // Check if we have already seen this transaction and set it dead OR // if it is not alive - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry->setDead(); return; @@ -2559,7 +2708,7 @@ void Table::processEntry(TransactionPart *entry) { // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2571,18 +2720,16 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - // Update the last transaction sequence number that the arbitrator - // arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - Hashtable *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable *, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a @@ -2613,7 +2760,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven // seen yet SetIterator *rmit = watchset->iterator(); - while(rmit->hasNext()) { + while (rmit->hasNext()) { RejectedMessage *rm = rmit->next(); // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { @@ -2627,20 +2774,24 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } // Set dead the abort - for (IteratorEntry, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort *abort = i->next()->getValue(); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); + + while (abortit->hasNext()) { + Pair *key = abortit->next(); + Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); - i->remove(); + abortit->remove(); if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } + delete abortit; if (machineId == localMachineId) { // Our own messages are immediately dead-> char livenessType = liveness->getType(); - if (livenessType==TypeLastMessage) { + if (livenessType == TypeLastMessage) { ((LastMessage *)liveness)->setDead(); } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); @@ -2649,7 +2800,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } } // Get the old last message for this device - Pair * lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; @@ -2658,11 +2809,11 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); Liveness *lastEntry = lastMessageEntry->getSecond(); delete lastMessageEntry; - + // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { char lastEntryType = lastEntry->getType(); - + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); } else if (lastEntryType == TypeSlot) { @@ -2710,7 +2861,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { * Check if the HMAC chain is not violated */ void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (int i = 0; i < newSlots->length(); i++) { + for (uint i = 0; i < newSlots->length(); i++) { Slot *currSlot = newSlots->get(i); Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL &&