X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=7c3fde05215cfeb3b547f59d8064e38039b7ef51;hp=5f93fe894925c0f9321ff6b08c0f98d79556bb15;hb=d28d6cb0b30fcb629eb66feb8506c7e76a3652f8;hpb=178126b9fba2afc8f66e1c920f4991fd643d615b diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 5f93fe8..7c3fde0 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -10,11 +10,27 @@ #include "TransactionStatus.h" #include "Transaction.h" #include "LastMessage.h" -#include "Random.h" +#include "SecureRandom.h" #include "ByteBuffer.h" #include "Abort.h" #include "CommitPart.h" - +#include "ArbitrationRound.h" +#include "TransactionPart.h" +#include "Commit.h" +#include "RejectedMessage.h" +#include "SlotIndexer.h" +#include + +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), @@ -30,6 +46,7 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -92,6 +109,7 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -140,30 +158,64 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : init(); } +Table::~Table() { + delete cloud; + delete random; + delete buffer; + // init data structs + delete committedKeyValueTable; + delete speculatedKeyValueTable; + delete pendingTransactionSpeculatedKeyValueTable; + delete liveNewKeyTable; + delete lastMessageTable; + delete rejectedMessageWatchVectorTable; + delete arbitratorTable; + delete liveAbortTable; + delete newTransactionParts; + delete newCommitParts; + delete lastArbitratedTransactionNumberByArbitratorTable; + delete liveTransactionBySequenceNumberTable; + delete liveTransactionByTransactionIdTable; + delete liveCommitsTable; + delete liveCommitsByKeyTable; + delete lastCommitSeenSequenceNumberByArbitratorTable; + delete rejectedSlotVector; + delete pendingTransactionQueue; + delete pendingSendArbitrationEntriesToDelete; + delete transactionPartsSent; + delete outstandingTransactionStatus; + delete liveAbortsGeneratedByLocal; + delete offlineTransactionsCommittedAndAtServer; + delete localCommunicationTable; + delete lastTransactionSeenFromMachineFromServer; + delete pendingSendArbitrationRounds; + delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; +} + /** * Init all the stuff needed for for table usage */ void Table::init() { // Init helper objects - random = new Random(); + random = new SecureRandom(); buffer = new SlotBuffer(); // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable >(); + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable * >(); rejectedMessageWatchVectorTable = new Hashtable * >(); - arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); - newTransactionParts = new Hashtable, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); - newCommitParts = new Hashtable, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + arbitratorTable = new Hashtable(); + liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); - liveCommitsTable = new Hashtable >(); - liveCommitsByKeyTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); + liveCommitsTable = new Hashtable * >(); + liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); pendingTransactionQueue = new Vector(); @@ -171,8 +223,8 @@ void Table::init() { transactionPartsSent = new Hashtable *>(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset, uintptr_t, 0, pairHashFunction, pairEquals>(); - localCommunicationTable = new Hashtable >(); + offlineTransactionsCommittedAndAtServer = new Hashset *, uintptr_t, 0, pairHashFunction, pairEquals>(); + localCommunicationTable = new Hashtable *>(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); @@ -222,7 +274,7 @@ void Table::rebuild() { } void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { - localCommunicationTable->put(arbitrator, Pair(hostName, portNumber)); + localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); } int64_t Table::getArbitrator(IoTString *key) { @@ -230,7 +282,7 @@ int64_t Table::getArbitrator(IoTString *key) { } void Table::close() { - cloud->close(); + cloud->closeCloud(); } IoTString *Table::getCommitted(IoTString *key) { @@ -321,8 +373,8 @@ bool Table::update() { updateLiveTransactionsAndStatus(); return true; } catch (Exception *e) { - SetIterator * kit = getKeyIterator(localCommunicationTable); - while(kit->hasNext()) { + SetIterator *> *kit = getKeyIterator(localCommunicationTable); + while (kit->hasNext()) { int64_t m = kit->next(); updateFromLocal(m); } @@ -334,7 +386,7 @@ bool Table::update() { bool Table::createNewKey(IoTString *keyName, int64_t machineId) { while (true) { - if (!arbitratorTable->contains(keyName)) { + if (arbitratorTable->contains(keyName)) { // There is already an arbitrator return false; } @@ -404,10 +456,10 @@ TransactionStatus *Table::commitTransaction() { Hashset *arbitratorTriedAndFailed = new Hashset(); uint size = pendingTransactionQueue->size(); uint oldindex = 0; - for(int iter = 0; iter < size; iter++) { + for (uint iter = 0; iter < size; iter++) { Transaction *transaction = pendingTransactionQueue->get(iter); pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); - + if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client // to preserve ordering for arbitrator @@ -430,7 +482,7 @@ TransactionStatus *Table::commitTransaction() { } pendingTransactionQueue->setSize(oldindex); } - + updateLiveStateFromLocal(); return transactionStatus; @@ -448,213 +500,224 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } -bool Table::sendToServer(NewKey *newKey) { - bool fromRetry = false; - try { - if (hadPartialSendToServer) { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); - if (newSlots->length() == 0) { - fromRetry = true; - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - - if (sendSlotsReturn.getFirst()) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - SetIterator * trit = getKeyIterator(lastTransactionPartsSent); - while(trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - delete trit; - } else { - newSlots = sendSlotsReturn.getThird(); - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { +NewKey * Table::handlePartialSend(NewKey * newKey) { + //Didn't receive acknowledgement for last send + //See if the server has received a newer slot + + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + if (newSlots->length() == 0) { + //Retry sending old slot + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); + + if (sendSlotsReturn) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } + } + delete trit; + } else { + bool isInserted = false; + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); + if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); + if (isInserted) { + break; + } + + // Process each entry in the slot + Vector *ventries = s->getEntries(); + uint vesize = ventries->size(); + for (uint vei = 0; vei < vesize; vei++) { + Entry *entry = ventries->get(vei); + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; } } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - Vector * ventries=s->getEntries(); - uint vesize = ventries->size(); - for(uint vei = 0; vei < vesize; vei++) { - Entry *entry = ventries->get(vei); - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } + } + } + + if (isInserted) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; } } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } else { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } } } - - if (sendSlotsReturn.getThird()->length() != 0) { - // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); - } - // continue; - } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + delete trit; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + + if (newSlots->length() != 0) { + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + // continue; + } else { + bool isInserted = false; + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); + if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); + if (isInserted) { + break; + } + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; } } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - for (Entry *entry : s->getEntries()) { - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } + } + } + + if (isInserted) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); } else { - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } - - // insert into the local block chain - validateAndUpdate(newSlots, true); } + delete trit; + } else { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; } - } catch (ServerException *e) { - throw e; + + // insert into the local block chain + validateAndUpdate(newSlots, true); } + return newKey; +} - +bool Table::sendToServer(NewKey *newKey) { + if (hadPartialSendToServer) { + newKey = handlePartialSend(newKey); + } try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { - - fromRetry = false; - if (hadPartialSendToServer) { throw new Error("Should Be error free"); } - - - + // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { return false; @@ -672,7 +735,9 @@ bool Table::sendToServer(NewKey *newKey) { if (needsResize) { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -680,6 +745,7 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; // Clear the sent data since we are trying again pendingSendArbitrationEntriesToDelete->clear(); @@ -694,14 +760,14 @@ bool Table::sendToServer(NewKey *newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable * >(transactionPartsSent); + lastTransactionPartsSent = transactionPartsSent->clone(); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + Array * newSlots = NULL; + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - - if (sendSlotsReturn.getFirst()) { - + if (sendSlotsReturn) { // Did insert into the block chain if (insertedNewKey) { @@ -714,19 +780,21 @@ bool Table::sendToServer(NewKey *newKey) { // Remove the aborts and commit parts that were sent from the pending to send queue uint size = pendingSendArbitrationRounds->size(); uint oldcount = 0; - for (uint i=0; i < size; i++) - ArbitrationRound *round = pendingSendArbitrartionRounds->get(i); + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); round->removeParts(pendingSendArbitrationEntriesToDelete); if (!round->isDoneSending()) { // Sent all the parts - pendingSendArbitrartionRounds->set(oldcount++, - pendingSendArbitrartionRounds->get(i)); + pendingSendArbitrationRounds->set(oldcount++, + pendingSendArbitrationRounds->get(i)); } } - pendingSendArbitrationRounds->setSize(oldcount); + pendingSendArbitrationRounds->setSize(oldcount); - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Update which transactions parts still need to be sent @@ -744,9 +812,12 @@ bool Table::sendToServer(NewKey *newKey) { pendingTransactionQueue->remove(transaction); } } + delete trit; } else { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -754,23 +825,25 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } // Clear the sent data in preparation for next send pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); - if (sendSlotsReturn.getThird()->length() != 0) { + if (newSlots->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getThird(), true); + validateAndUpdate(newSlots, true); } } } catch (ServerException *e) { - - if (e->getType() != ServerException->TypeInputTimeout) { + if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -778,15 +851,19 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } else { // There was a partial send to the server hadPartialSendToServer = true; // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); transaction->setServerFailure(); } + delete trit; } pendingSendArbitrationEntriesToDelete->clear(); @@ -799,17 +876,16 @@ bool Table::sendToServer(NewKey *newKey) { } bool Table::updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable->get(machineId); - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing + if (!localCommunicationTable->contains(machineId)) return false; - } + + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); } @@ -821,7 +897,7 @@ bool Table::updateFromLocal(int64_t machineId) { bbEncode->putInt(0); // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -836,10 +912,10 @@ bool Table::updateFromLocal(int64_t machineId) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort*)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -852,38 +928,45 @@ bool Table::updateFromLocal(int64_t machineId) { Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); - - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing + if (!localCommunicationTable->contains(transaction->getArbitrator())) return Pair(true, false); - } + + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart *part : transaction->getParts()->values()) { - sendDataSize += part->getSize(); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + sendDataSize += part->getSize(); + } } int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); } // Make the send data size Array *sendData = new Array(sendDataSize); - ByteBuffer *bbEncode = ByteBuffer.wrap(sendData); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); // Encode the data bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); - for (TransactionPart *part : transaction->getParts()->values()) { - part->encode(bbEncode); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + part->encode(bbEncode); + } } - // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -901,7 +984,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort*)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; @@ -909,7 +992,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -917,14 +1000,14 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (didCommit) { status->setStatus(TransactionStatus_StatusCommitted); } else { status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (foundAbort) { status->setStatus(TransactionStatus_StatusAborted); } else { @@ -936,7 +1019,6 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { } Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); @@ -952,7 +1034,7 @@ Array *Table::acceptDataFromLocal(Array *data) { Transaction *transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { bbDecode->get(); - TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); + TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode); transaction->addPartDecode(newPart); } @@ -965,7 +1047,7 @@ Array *Table::acceptDataFromLocal(Array *data) { // Transaction was sent to the server so keep track of it to prevent double commit if (transaction->getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer->add(transaction->getId()); + offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); } } @@ -974,9 +1056,19 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); - Collections->sort(abortLocalSequenceNumbers); - for (int64_t localSequenceNumber : abortLocalSequenceNumbers) { + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while (abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint asize = abortLocalSequenceNumbers->size(); + for (uint i = 0; i < asize; i++) { + int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } @@ -989,20 +1081,32 @@ Array *Table::acceptDataFromLocal(Array *data) { // Get the commits to send back Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitLocalSequenceNumbers); - - for (int64_t localSequenceNumber : commitLocalSequenceNumbers) { + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while (commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint clsSize = commitLocalSequenceNumbers->size(); + for (uint clsi = 0; clsi < clsSize; clsi++) { + int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); Commit *commit = commitForClientTable->get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - unseenArbitrations->addAll(commit->getParts()->values()); - - for (CommitPart *commitPart : commit->getParts()->values()) { - returnDataSize += commitPart->getSize(); + { + Vector *parts = commit->getParts(); + uint nParts = parts->size(); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } } } } @@ -1034,8 +1138,8 @@ Array *Table::acceptDataFromLocal(Array *data) { bbEncode->putInt(unseenArbitrations->size()); uint size = unseenArbitrations->size(); - for(uint i = 0; i< size; i++) { - Entry * entry = unseenArbitrations->get(i); + for (uint i = 0; i < size; i++) { + Entry *entry = unseenArbitrations->get(i); entry->encode(bbEncode); } @@ -1043,70 +1147,72 @@ Array *Table::acceptDataFromLocal(Array *data) { return returnData; } -ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { - bool attemptedToSendToServerTmp = attemptedToSendToServer; - attemptedToSendToServer = true; - bool inserted = false; - bool lastTryInserted = false; +/** Method tries to send slot to server. Returns status in tuple. + isInserted returns whether last un-acked send (if any) was + successful. Returns whether send was confirmed.x + */ - Array *array = cloud->putSlot(slot, newSize); - if (array == NULL) { - array = new Array(); - array->set(0, slot); +bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array **array) { + attemptedToSendToServer = true; + + *array = cloud->putSlot(slot, newSize); + if (*array == NULL) { + *array = new Array(1); + (*array)->set(0, slot); rejectedSlotVector->clear(); - inserted = true; + *isInserted = false; + return true; } else { - if (array->length() == 0) { + if ((*array)->length() == 0) { throw new Error("Server Error: Did not send any slots"); } - // if (attemptedToSendToServerTmp) { if (hadPartialSendToServer) { - - bool isInserted = false; - uint size = array->size(); - for(uint i=0; i < size; i++) { - Slot *s = array->get(i); + *isInserted = false; + uint size = (*array)->length(); + for (uint i = 0; i < size; i++) { + Slot *s = (*array)->get(i); if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; + *isInserted = true; break; } } - for(uint i=0; i < size; i++) { - Slot *s = array->get(i); - if (isInserted) { - break; - } - - // Process each entry in the slot - for (Entry *entry : s->getEntries()) { - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { - isInserted = true; - break; + //Also need to see if other machines acknowledged our message + if (!(*isInserted)) { + for (uint i = 0; i < size; i++) { + Slot *s = (*array)->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { + *isInserted = true; + goto done; + } } } } } - - if (!isInserted) { + done: + if (!(*isInserted)) { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; - } else { - lastTryInserted = true; } + + return false; } else { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; + *isInserted = false; + return false; } } - - return ThreeTuple *>(inserted, lastTryInserted, array); } /** @@ -1115,7 +1221,7 @@ ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { int newSize = 0; if (liveSlotCount > bufferResizeThreshold) { - resize = true; //Resize is forced + resize = true;//Resize is forced } if (resize) { @@ -1153,17 +1259,17 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey transactionPartsSent->clear(); pendingSendArbitrationEntriesToDelete->clear(); uint size = pendingSendArbitrationRounds->size(); - for (uint i=0; iget(i); + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); bool isFull = false; round->generateParts(); Vector *parts = round->getParts(); // Insert pending arbitration data uint vsize = parts->size(); - for (uint vi=0; viget(vi); - + // If it is an abort then we need to set some information if (arbitrationData->getType() == TypeAbort) { ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); @@ -1226,14 +1332,14 @@ void Table::doRejectedMessages(Slot *s) { * there is already a sufficient entry in the queue (e->g->, * equalsto value of true and same sequence number)-> */ - int64_t old_seqn = rejectedSlotVector->firstElement(); + int64_t old_seqn = rejectedSlotVector->get(0); if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s->addEntry(rm); } else { int64_t prev_seqn = -1; - int i = 0; + uint i = 0; /* Go through list of missing messages */ for (; i < rejectedSlotVector->size(); i++) { int64_t curr_seqn = rejectedSlotVector->get(i); @@ -1274,7 +1380,7 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot previousSlot = buffer->getSlot(currentSequenceNumber); + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; @@ -1291,7 +1397,9 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry *liveEntry : liveEntries) { + uint lESize = liveEntries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry slot->addEntry(liveEntry); @@ -1312,7 +1420,6 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer->getNewestSeqNum(); -search: for (; seqn <= newestseqnum; seqn++) { Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward @@ -1323,16 +1430,20 @@ search: continue; seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); - for (Entry *liveentry : liveentries) { + uint lESize = liveentries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { skipcount++; if (skipcount > Table_SKIP_THRESHOLD) - break search; + goto donesearch; } } } +donesearch: + ; } /** @@ -1360,13 +1471,22 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(lastMessageTable->keySet()); + Hashset *machineSet = new Hashset(); + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; + } // Process each slots data - for (Slot *slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - - updateExpectedSize(); + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } } // If there is a gap, check to see if the server sent us @@ -1375,7 +1495,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots->length); + checkNumSlots(newSlots->length()); // Since there was a gap every machine must have pushed a slot or // must have a last message message-> If not then the server is @@ -1389,16 +1509,19 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal commitNewMaxSize(); // Commit new to slots to the local block chain-> - for (Slot *slot : newSlots) { + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); - // Keep track of how many slots are currently live (have live data - // in them)-> - liveSlotCount++; + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } } - // Get the sequence number of the latest slot in the system sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); @@ -1474,11 +1597,6 @@ void Table::checkNumSlots(int numberOfSlots) { } } -void Table::updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; -} - - /** * Update the size of of the local buffer if it is needed-> */ @@ -1511,18 +1629,24 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts // for - for (int64_t machineId : newTransactionParts->keySet()) { - Hashtable, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { + int64_t machineId = tpit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + while (ptit->hasNext()) { + Pair *partId = ptit->next(); TransactionPart *part = parts->get(partId); - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { - // Set dead the transaction part - part->setDead(); - continue; + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + continue; + } } // Get the transaction object for that sequence number @@ -1534,14 +1658,15 @@ void Table::processNewTransactionParts() { // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); + liveTransactionByTransactionIdTable->put(new Pair(part->getTransactionId()), transaction); } // Add that part to the transaction transaction->addPartDecode(part); } + delete ptit; } - + delete tpit; // Clear all the new transaction parts in preparation for the next // time the server sends slots newTransactionParts->clear(); @@ -1555,21 +1680,26 @@ void Table::arbitrateFromServer() { } // Get the transaction sequence numbers and sort from oldest to newest - Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbers); + Vector *transactionSequenceNumbers = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbers->add(trit->next()); + delete trit; + } + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + Hashtable *speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Hashset *generatedAborts = new Hashset(); - - for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { + uint tsnSize = transactionSequenceNumbers->size(); + for (uint i = 0; i < tsnSize; i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - // Check if this machine arbitrates for this transaction if not // then we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { @@ -1594,7 +1724,7 @@ void Table::arbitrateFromServer() { // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) { + if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } else { int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); @@ -1607,9 +1737,12 @@ void Table::arbitrateFromServer() { // Guard evaluated as true // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); speculativeTableTmp->put(kv->getKey(), kv); } + delete kvit; // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; @@ -1641,9 +1774,13 @@ void Table::arbitrateFromServer() { localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } + delete spit; // create the commit parts newCommit->createCommitParts(); @@ -1651,7 +1788,10 @@ void Table::arbitrateFromServer() { // Append all the commit parts to the end of the pending queue // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1663,7 +1803,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1687,7 +1830,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server return Pair(false, false); @@ -1702,9 +1845,12 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { localArbitrationSequenceNumber++; // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); newCommit->addKV(kv); } + delete kvit; // create the commit parts newCommit->createCommitParts(); @@ -1716,12 +1862,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1739,7 +1891,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status // Guard evaluated was false so create abort - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } @@ -1763,7 +1915,11 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1786,18 +1942,18 @@ bool Table::compactArbitrationData() { } ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - if (lastRound->didSendPart()) { + if (lastRound->getDidSendPart()) { return false; } bool hadCommit = (lastRound->getCommit() == NULL); bool gotNewCommit = false; - int numberToDelete = 1; + uint numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); - if (round->isFull() || round->didSendPart()) { + if (round->isFull() || round->getDidSendPart()) { // Stop since there is a part that cannot be compacted and we // need to compact in order break; @@ -1806,14 +1962,14 @@ bool Table::compactArbitrationData() { if (round->getCommit() == NULL) { // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } lastRound->addAborts(round->getAborts()); } else { // Create a new larger commit - Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1824,7 +1980,7 @@ bool Table::compactArbitrationData() { newSize += lastRound->getAbortsCount(); newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } @@ -1844,8 +2000,8 @@ bool Table::compactArbitrationData() { if (numberToDelete == pendingSendArbitrationRounds->size()) { pendingSendArbitrationRounds->clear(); } else { - for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); + for (uint i = 0; i < numberToDelete; i++) { + pendingSendArbitrationRounds->removeIndex(pendingSendArbitrationRounds->size() - 1); } } @@ -1873,11 +2029,15 @@ bool Table::updateCommittedTable() { } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -1902,7 +2062,9 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; } + delete partsit; // Clear all the new commits parts in preparation for the next time // the server sends slots @@ -1912,23 +2074,32 @@ bool Table::updateCommittedTable() { bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitSequenceNumbers); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } // Go through each new commit one by one - for (int i = 0; i < commitSequenceNumbers->size(); i++) { + for (uint i = 0; i < commitSequenceNumbers->size(); i++) { int64_t commitSequenceNumber = commitSequenceNumbers->get(i); Commit *commit = commitForClientTable->get(commitSequenceNumber); @@ -1951,17 +2122,14 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) { - + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -1979,9 +2147,8 @@ bool Table::updateCommittedTable() { // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } @@ -1994,31 +2161,45 @@ bool Table::updateCommittedTable() { // Get what commits should be edited, these are the commits that // have live values for their keys Hashset *commitsToEdit = new Hashset(); - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); + } + delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { // Update which keys in the old commits are still live - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - previousCommit->invalidateKey(kv->getKey()); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + previousCommit->invalidateKey(kv->getKey()); + } + delete kvit; } // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); } } } + delete commitit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2030,12 +2211,18 @@ bool Table::updateCommittedTable() { didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - committedKeyValueTable->put(kv->getKey(), kv); - liveCommitsByKeyTable->put(kv->getKey(), commit); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); + } + delete kvit; } } } + delete liveit; return didProcessANewCommit; } @@ -2045,15 +2232,22 @@ bool Table::updateCommittedTable() { * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } // Create a list of the transaction sequence numbers and sort them // from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbersSorted); + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2068,14 +2262,18 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + uint startIndex = 0; + + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds @@ -2085,7 +2283,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Hashset *incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; - for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { + for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); @@ -2106,8 +2304,13 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { - speculatedKeyValueTable->put(kv->getKey(), kv); + { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; } } } @@ -2140,23 +2343,30 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + uint startIndex = 0; + + for (; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; } - for (int i = startIndex; i < pendingTransactionQueue->size(); i++) { + for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) { Transaction *transaction = pendingTransactionQueue->get(i); lastPendingTransactionSpeculatedOn = transaction; if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); } + delete kvit; } } } @@ -2166,38 +2376,44 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + } } + delete iter; } // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); + { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); - // Remove - iter->remove(); + // Remove + iter->remove(); + } } + delete iter; } } @@ -2210,7 +2426,10 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); // Process each entry in the slot - for (Entry *entry : slot->getEntries()) { + Vector *entries = slot->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); switch (entry->getType()) { case TypeCommitPart: processEntry((CommitPart *)entry); @@ -2269,7 +2488,7 @@ void Table::processEntry(NewKey *entry) { * seen in this current round of updating the local copy of the block * chain */ -void Table::processEntry(TableStatus entry, int64_t seq) { +void Table::processEntry(TableStatus *entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); initExpectedSize(seq, newNumSlots); @@ -2314,9 +2533,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // Create a list of clients to watch until they see this rejected // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry > *lastMessageEntry : lastMessageTable->entrySet()) { + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2324,8 +2544,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair lastMessageValue = lastMessageEntry->getValue(); - int64_t entrySequenceNumber = lastMessageValue.getFirst(); + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this @@ -2336,6 +2556,8 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; + if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); @@ -2360,19 +2582,21 @@ void Table::processEntry(Abort *entry) { // Abort has not been seen by the client it is for yet so we need to // keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { - previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version + previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(entry->getAbortId()); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); @@ -2381,7 +2605,7 @@ void Table::processEntry(Abort *entry) { } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -2393,15 +2617,17 @@ void Table::processEntry(Abort *entry) { } // Set dead a transaction if we can - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } // Update the last transaction sequence number that the arbitrator // arbitrated on - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2416,25 +2642,24 @@ void Table::processEntry(Abort *entry) { void Table::processEntry(TransactionPart *entry) { // Check if we have already seen this transaction and set it dead OR // if it is not alive - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry->setDead(); return; } // This part is still alive - Hashtable, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable, TransactionPart *>(); + transactionPart = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newTransactionParts->put(entry->getMachineId(), transactionPart); } // Update the part and set dead ones we have already seen (got a // rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2446,23 +2671,20 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - // Update the last transaction sequence number that the arbitrator - // arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId() || lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber())) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - Hashtable, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a // rescued version) - CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); + CommitPart *previouslySeenPart = commitPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2476,7 +2698,7 @@ void Table::processEntry(CommitPart *entry) { * our own last message or that other clients have not had a rollback * on the last message-> */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet->remove(machineId); @@ -2486,7 +2708,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven if (watchset != NULL) { // Go through each rejected message that this machine Id has not // seen yet - for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { + + SetIterator *rmit = watchset->iterator(); + while (rmit->hasNext()) { RejectedMessage *rm = rmit->next(); // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { @@ -2496,44 +2720,53 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven rm->removeWatcher(machineId); } } + delete rmit; } // Set dead the abort - for (IteratorEntry, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort *abort = i->next()->getValue(); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); + + while (abortit->hasNext()) { + Pair *key = abortit->next(); + Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); - i->remove(); + abortit->remove(); if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } + delete abortit; if (machineId == localMachineId) { // Our own messages are immediately dead-> - if (liveness instanceof LastMessage) { + char livenessType = liveness->getType(); + if (livenessType == TypeLastMessage) { ((LastMessage *)liveness)->setDead(); - } else if (liveness instanceof Slot) { + } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable->put(machineId, Pair(seqNum, liveness)); + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; } - int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); - Liveness *lastEntry = lastMessageEntry.getSecond(); + int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); + Liveness *lastEntry = lastMessageEntry->getSecond(); + delete lastMessageEntry; // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { - if (lastEntry instanceof LastMessage) { + char lastEntryType = lastEntry->getType(); + + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); - } else if (lastEntry instanceof Slot) { + } else if (lastEntryType == TypeSlot) { ((Slot *)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); @@ -2578,7 +2811,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { * Check if the HMAC chain is not violated */ void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (int i = 0; i < newSlots->length(); i++) { + for (uint i = 0; i < newSlots->length(); i++) { Slot *currSlot = newSlots->get(i); Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL &&