X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;h=531337cc5336f1c69dac6aed68018b2899355c98;hp=22d62a250b63bfab30dd294108f16c3dfb6222c8;hb=b7ed1849727b50e226f3b9d1c432d3071d739368;hpb=18c08ca8d6f4aed6fa24003826304cd5700f7b7d diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 22d62a2..531337c 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -10,8 +10,27 @@ #include "TransactionStatus.h" #include "Transaction.h" #include "LastMessage.h" -#include "Random.h" - +#include "SecureRandom.h" +#include "ByteBuffer.h" +#include "Abort.h" +#include "CommitPart.h" +#include "ArbitrationRound.h" +#include "TransactionPart.h" +#include "Commit.h" +#include "RejectedMessage.h" +#include "SlotIndexer.h" +#include + +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), @@ -27,6 +46,7 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -89,6 +109,7 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : oldestLiveSlotSequenceNumver(1), localMachineId(_localMachineId), sequenceNumber(0), + localSequenceNumber(0), localTransactionSequenceNumber(0), lastTransactionSequenceNumberSpeculatedOn(0), oldestTransactionSequenceNumberSpeculatedOn(0), @@ -137,30 +158,139 @@ Table::Table(CloudComm *_cloud, int64_t _localMachineId) : init(); } +Table::~Table() { + delete cloud; + delete random; + delete buffer; + // init data structs + delete committedKeyValueTable; + delete speculatedKeyValueTable; + delete pendingTransactionSpeculatedKeyValueTable; + delete liveNewKeyTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + { + SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); + while(rmit->hasNext()) { + int64_t machineid = rmit->next(); + Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); + SetIterator * mit = rmset->iterator(); + while (mit->hasNext()) { + RejectedMessage * rm = mit->next(); + delete rm; + } + delete mit; + delete rmset; + } + delete rmit; + delete rejectedMessageWatchVectorTable; + } + delete arbitratorTable; + delete liveAbortTable; + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + delete newTransactionParts; + } + { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + delete parts; + } + delete partsit; + delete newCommitParts; + } + delete lastArbitratedTransactionNumberByArbitratorTable; + delete liveTransactionBySequenceNumberTable; + delete liveTransactionByTransactionIdTable; + { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) { + int64_t id = clientit->next(); + delete commitForClientTable->get(id); + } + delete clientit; + } + + delete commitForClientTable; + } + delete liveit; + delete liveCommitsTable; + } + delete liveCommitsByKeyTable; + delete lastCommitSeenSequenceNumberByArbitratorTable; + delete rejectedSlotVector; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } + delete pendingSendArbitrationEntriesToDelete; + delete transactionPartsSent; + delete outstandingTransactionStatus; + delete liveAbortsGeneratedByLocal; + delete offlineTransactionsCommittedAndAtServer; + delete localCommunicationTable; + delete lastTransactionSeenFromMachineFromServer; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; + delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; +} + /** * Init all the stuff needed for for table usage */ void Table::init() { // Init helper objects - random = new Random(); + random = new SecureRandom(); buffer = new SlotBuffer(); // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable *>(); + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable * >(); rejectedMessageWatchVectorTable = new Hashtable * >(); - arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable *, Abort *>(); - newTransactionParts = new Hashtable *, TransactionPart *> *>(); - newCommitParts = new Hashtable *, CommitPart *> *>(); + arbitratorTable = new Hashtable(); + liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable *, Transaction *>(); - liveCommitsTable = new Hashtable >(); - liveCommitsByKeyTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); + liveCommitsTable = new Hashtable * >(); + liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); pendingTransactionQueue = new Vector(); @@ -168,13 +298,12 @@ void Table::init() { transactionPartsSent = new Hashtable *>(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset *>(); - localCommunicationTable = new Hashtable >(); + offlineTransactionsCommittedAndAtServer = new Hashset *, uintptr_t, 0, pairHashFunction, pairEquals>(); + localCommunicationTable = new Hashtable *>(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); - // Other init stuff numberOfSlots = buffer->capacity(); setResizeThreshold(); @@ -199,10 +328,13 @@ void Table::initTable() { array->set(0, s); // update local block chain validateAndUpdate(array, true); + delete array; } else if (array->length() == 1) { // in case we did push the slot BUT we failed to init it validateAndUpdate(array, true); + delete array; } else { + delete array; throw new Error("Error on initialization"); } } @@ -215,12 +347,13 @@ void Table::rebuild() { // Just pull the latest slots from the server Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); + delete newslots; sendToServer(NULL); updateLiveTransactionsAndStatus(); } void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { - localCommunicationTable->put(arbitrator, Pair(hostName, portNumber)); + localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); } int64_t Table::getArbitrator(IoTString *key) { @@ -228,14 +361,14 @@ int64_t Table::getArbitrator(IoTString *key) { } void Table::close() { - cloud->close(); + cloud->closeCloud(); } IoTString *Table::getCommitted(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -253,7 +386,7 @@ IoTString *Table::getSpeculative(IoTString *key) { } if (kv != NULL) { - return kv->getValue(); + return new IoTString(kv->getValue()); } else { return NULL; } @@ -274,7 +407,7 @@ IoTString *Table::getCommittedAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -304,7 +437,7 @@ IoTString *Table::getSpeculativeAtomic(IoTString *key) { if (kv != NULL) { pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return kv->getValue(); + return new IoTString(kv->getValue()); } else { pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); return NULL; @@ -315,16 +448,17 @@ bool Table::update() { try { Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); + delete newSlots; sendToServer(NULL); - - updateLiveTransactionsAndStatus(); - return true; } catch (Exception *e) { - for (int64_t m : localCommunicationTable->keySet()) { + SetIterator *> *kit = getKeyIterator(localCommunicationTable); + while (kit->hasNext()) { + int64_t m = kit->next(); updateFromLocal(m); } + delete kit; } return false; @@ -332,11 +466,10 @@ bool Table::update() { bool Table::createNewKey(IoTString *keyName, int64_t machineId) { while (true) { - if (!arbitratorTable->contains(keyName)) { + if (arbitratorTable->contains(keyName)) { // There is already an arbitrator return false; } - NewKey *newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { @@ -348,11 +481,12 @@ bool Table::createNewKey(IoTString *keyName, int64_t machineId) { void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; pendingTransactionBuilder = new PendingTransaction(localMachineId); } -void Table::addKV(IoTString *key, IoTString *value) { - +void Table::put(IoTString *key, IoTString *value) { // Make sure it is a valid key if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); @@ -365,12 +499,11 @@ void Table::addKV(IoTString *key, IoTString *value) { } // Add the key value to this transaction - KeyValue *kv = new KeyValue(key, value); + KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); pendingTransactionBuilder->addKV(kv); } TransactionStatus *Table::commitTransaction() { - if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { // transaction with no updates will have no effect on the system return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); @@ -392,9 +525,12 @@ TransactionStatus *Table::commitTransaction() { pendingTransactionQueue->add(newTransaction); } else { arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; updateLiveStateFromLocal(); } - + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); try { @@ -402,8 +538,11 @@ TransactionStatus *Table::commitTransaction() { } catch (ServerException *e) { Hashset *arbitratorTriedAndFailed = new Hashset(); - for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { - Transaction *transaction = iter->next(); + uint size = pendingTransactionQueue->size(); + uint oldindex = 0; + for (uint iter = 0; iter < size; iter++) { + Transaction *transaction = pendingTransactionQueue->get(iter); + pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client @@ -411,20 +550,22 @@ TransactionStatus *Table::commitTransaction() { continue; } - Pair *sendReturn = sendTransactionToLocal(transaction); + Pair sendReturn = sendTransactionToLocal(transaction); - if (sendReturn->getFirst()) { + if (sendReturn.getFirst()) { // Failed to contact over local arbitratorTriedAndFailed->add(transaction->getArbitrator()); } else { // Successful contact or should not contact - if (sendReturn->getSecond()) { + if (sendReturn.getSecond()) { // did arbitrate - iter->remove(); + delete transaction; + oldindex--; } } } + pendingTransactionQueue->setSize(oldindex); } updateLiveStateFromLocal(); @@ -444,233 +585,182 @@ int64_t Table::getLocalSequenceNumber() { return localSequenceNumber; } - -bool lastInsertedNewKey = false; - -bool Table::sendToServer(NewKey *newKey) { - - bool fromRetry = false; - - try { - if (hadPartialSendToServer) { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); - if (newSlots->length() == 0) { - fromRetry = true; - ThreeTuple *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - - if (sendSlotsReturn->getFirst()) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } +NewKey * Table::handlePartialSend(NewKey * newKey) { + //Didn't receive acknowledgement for last send + //See if the server has received a newer slot + + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + if (newSlots->length() == 0) { + //Retry sending old slot + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); + + if (sendSlotsReturn) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } + } + delete trit; + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } else { transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } - } - } else { - - newSlots = sendSlotsReturn->getThird(); - - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - for (Entry *entry : s->getEntries()) { - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } } } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { + delete trit; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + + if (newSlots->length() != 0) { + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + newKey = NULL; + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + } else { transaction->resetServerFailure(); // Set the transaction sequence number back to nothing if (!transaction->didSendAPartToServer()) { transaction->setSequenceNumber(-1); } } - - if (sendSlotsReturn->getThird()->length() != 0) { - // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); - } - // continue; - } else { - bool isInserted = false; - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; - } - } - - for (uint si = 0; si < newSlots->length(); si++) { - Slot *s = newSlots->get(si); - if (isInserted) { - break; - } - - // Process each entry in the slot - for (Entry *entry : s->getEntries()) { - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } - - if (isInserted) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - newKey = NULL; - } - } - - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - } else { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - } else { - for (Transaction *transaction : lastTransactionPartsSent->keySet()) { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } + } + delete trit; + } else { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); } - - // insert into the local block chain - validateAndUpdate(newSlots, true); } + delete trit; } - } catch (ServerException *e) { - throw e; + + // insert into the local block chain + validateAndUpdate(newSlots, true); } + delete newSlots; + return newKey; +} - +bool Table::sendToServer(NewKey *newKey) { + if (hadPartialSendToServer) { + newKey = handlePartialSend(newKey); + } try { // While we have stuff that needs inserting into the block chain while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { - - fromRetry = false; - if (hadPartialSendToServer) { throw new Error("Should Be error free"); } - - - + // If there is a new key with same name then end if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { return false; } // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer->getSlot(sequenceNumber)->getHMAC(), localSequenceNumber); + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); localSequenceNumber++; // Try to fill the slot with data - ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn->getFirst(); - int newSize = fillSlotsReturn->getSecond(); - bool insertedNewKey = fillSlotsReturn->getThird(); + int newSize = 0; + bool insertedNewKey = false; + bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); if (needsResize) { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -678,13 +768,14 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; // Clear the sent data since we are trying again pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); // We needed a resize so try again - fillSlot(slot, true, newKey); + fillSlot(slot, true, newKey, newSize, insertedNewKey); } lastSlotAttemptedToSend = slot; @@ -692,35 +783,42 @@ bool Table::sendToServer(NewKey *newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable * >(transactionPartsSent); + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; + lastTransactionPartsSent = transactionPartsSent->clone(); lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + Array * newSlots = NULL; + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - - if (sendSlotsReturn->getFirst()) { - + if (sendSlotsReturn) { // Did insert into the block chain - if (insertedNewKey) { // This slot was what was inserted not a previous slot - // New Key was successfully inserted into the block chain so dont want to insert it again newKey = NULL; } // Remove the aborts and commit parts that were sent from the pending to send queue - for (Iterator *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) { - ArbitrationRound *round = iter->next(); + uint size = pendingSendArbitrationRounds->size(); + uint oldcount = 0; + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); round->removeParts(pendingSendArbitrationEntriesToDelete); - if (round->isDoneSending()) { - // Sent all the parts - iter->remove(); - } + if (!round->isDoneSending()) { + //Add part back in + pendingSendArbitrationRounds->set(oldcount++, + pendingSendArbitrationRounds->get(i)); + } else + delete pendingSendArbitrationRounds->get(i); } + pendingSendArbitrationRounds->setSize(oldcount); - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetServerFailure(); // Update which transactions parts still need to be sent @@ -738,9 +836,12 @@ bool Table::sendToServer(NewKey *newKey) { pendingTransactionQueue->remove(transaction); } } + delete trit; } else { // Reset which transaction to send - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -748,23 +849,25 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } // Clear the sent data in preparation for next send pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); - if (sendSlotsReturn->getThird()->length() != 0) { + if (newSlots->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); + validateAndUpdate(newSlots, true); } + delete newSlots; } - } catch (ServerException *e) { - - if (e->getType() != ServerException->TypeInputTimeout) { + if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); // Set the transaction sequence number back to nothing @@ -772,15 +875,19 @@ bool Table::sendToServer(NewKey *newKey) { transaction->setSequenceNumber(-1); } } + delete trit; } else { // There was a partial send to the server hadPartialSendToServer = true; // Nothing was able to be sent to the server so just clear these data structures - for (Transaction *transaction : transactionPartsSent->keySet()) { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); transaction->setServerFailure(); } + delete trit; } pendingSendArbitrationEntriesToDelete->clear(); @@ -793,17 +900,16 @@ bool Table::sendToServer(NewKey *newKey) { } bool Table::updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable->get(machineId); - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing + if (!localCommunicationTable->contains(machineId)) return false; - } + + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); } @@ -830,10 +936,10 @@ bool Table::updateFromLocal(int64_t machineId) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -846,43 +952,50 @@ bool Table::updateFromLocal(int64_t machineId) { Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + if (!localCommunicationTable->contains(transaction->getArbitrator())) + return Pair(true, false); - if (localCommunicationInformation == NULL) { - // Cant talk to that device locally so do nothing - return new Pair(true, false); - } + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart *part : transaction->getParts()->values()) { - sendDataSize += part->getSize(); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + sendDataSize += part->getSize(); + } } int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); } // Make the send data size Array *sendData = new Array(sendDataSize); - ByteBuffer *bbEncode = ByteBuffer.wrap(sendData); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); // Encode the data bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); - for (TransactionPart *part : transaction->getParts()->values()) { - part->encode(bbEncode); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + part->encode(bbEncode); + } } - // Send by local Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { // Could not contact server - return new Pair(true, false); + return Pair(true, false); } // Decode the data @@ -895,7 +1008,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; @@ -903,7 +1016,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -911,14 +1024,14 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { updateLiveStateFromLocal(); if (couldArbitrate) { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (didCommit) { status->setStatus(TransactionStatus_StatusCommitted); } else { status->setStatus(TransactionStatus_StatusAborted); } } else { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (foundAbort) { status->setStatus(TransactionStatus_StatusAborted); } else { @@ -926,11 +1039,10 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { } } - return new Pair(false, true); + return Pair(false, true); } Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); @@ -946,20 +1058,20 @@ Array *Table::acceptDataFromLocal(Array *data) { Transaction *transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { bbDecode->get(); - TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); + TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode); transaction->addPartDecode(newPart); } // Arbitrate on transaction and pull relevant return data Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn->getFirst(); - didCommit = localArbitrateReturn->getSecond(); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); updateLiveStateFromLocal(); // Transaction was sent to the server so keep track of it to prevent double commit if (transaction->getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer->add(transaction->getId()); + offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); } } @@ -968,9 +1080,19 @@ Array *Table::acceptDataFromLocal(Array *data) { Vector *unseenArbitrations = new Vector(); // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); - Collections->sort(abortLocalSequenceNumbers); - for (int64_t localSequenceNumber : abortLocalSequenceNumbers) { + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while (abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint asize = abortLocalSequenceNumbers->size(); + for (uint i = 0; i < asize; i++) { + int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } @@ -983,20 +1105,32 @@ Array *Table::acceptDataFromLocal(Array *data) { // Get the commits to send back Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitLocalSequenceNumbers); - - for (int64_t localSequenceNumber : commitLocalSequenceNumbers) { + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while (commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint clsSize = commitLocalSequenceNumbers->size(); + for (uint clsi = 0; clsi < clsSize; clsi++) { + int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); Commit *commit = commitForClientTable->get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; } - unseenArbitrations->addAll(commit->getParts()->values()); - - for (CommitPart commitPart : commit->getParts()->values()) { - returnDataSize += commitPart->getSize(); + { + Vector *parts = commit->getParts(); + uint nParts = parts->size(); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } } } } @@ -1027,88 +1161,94 @@ Array *Table::acceptDataFromLocal(Array *data) { } bbEncode->putInt(unseenArbitrations->size()); - for (Entry *entry : unseenArbitrations) { + uint size = unseenArbitrations->size(); + for (uint i = 0; i < size; i++) { + Entry *entry = unseenArbitrations->get(i); entry->encode(bbEncode); } - localSequenceNumber++; return returnData; } -ThreeTuple *> *Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { - bool attemptedToSendToServerTmp = attemptedToSendToServer; - attemptedToSendToServer = true; +/** Checks whether a given slot was sent using new slots in + array. Returns true if sent and false otherwise. */ - bool inserted = false; - bool lastTryInserted = false; - - Array *array = cloud->putSlot(slot, newSize); - if (array == NULL) { - array = new Array(); - array->set(0, slot); - rejectedSlotVector->clear(); - inserted = true; - } else { - if (array->length() == 0) { - throw new Error("Server Error: Did not send any slots"); +bool Table::checkSend(Array * array, Slot *checkSlot) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + return true; } - - // if (attemptedToSendToServerTmp) { - if (hadPartialSendToServer) { - - bool isInserted = false; - for (Slot *s : array) { - if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - isInserted = true; - break; + } + + //Also need to see if other machines acknowledged our message + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { + return true; } } + } + } + //Not found + return false; +} - for (Slot *s : array) { - if (isInserted) { - break; - } +/** Method tries to send slot to server. Returns status in tuple. + isInserted returns whether last un-acked send (if any) was + successful. Returns whether send was confirmed.x + */ - // Process each entry in the slot - for (Entry *entry : s->getEntries()) { +bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array **array) { + attemptedToSendToServer = true; - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; + *array = cloud->putSlot(slot, newSize); + if (*array == NULL) { + *array = new Array(1); + (*array)->set(0, slot); + rejectedSlotVector->clear(); + *isInserted = false; + return true; + } else { + if ((*array)->length() == 0) { + throw new Error("Server Error: Did not send any slots"); + } - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { - isInserted = true; - break; - } - } - } - } + if (hadPartialSendToServer) { + *isInserted = checkSend(*array, slot); - if (!isInserted) { + if (!(*isInserted)) { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; - } else { - lastTryInserted = true; } + + return false; } else { rejectedSlotVector->add(slot->getSequenceNumber()); - lastTryInserted = false; + *isInserted = false; + return false; } } - - return new ThreeTuple *>(inserted, lastTryInserted, array); } /** - * Returns false if a resize was needed + * Returns true if a resize was needed but not done. */ -ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { - - - int newSize = 0; +bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { + newSize = 0;//special value to indicate no resize if (liveSlotCount > bufferResizeThreshold) { - resize = true; //Resize is forced - + resize = true;//Resize is forced } if (resize) { @@ -1124,39 +1264,41 @@ ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables - bool needsResize = mandatoryRescueReturn->getFirst(); - bool seenLiveSlot = mandatoryRescueReturn->getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird(); + bool needsResize = mandatoryRescueReturn.getFirst(); + bool seenLiveSlot = mandatoryRescueReturn.getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { - // We need to resize but we are not resizing so return false - return new ThreeTuple(true, NULL, NULL); + // We need to resize but we are not resizing so return true to force on retry + return true; } - bool inserted = false; + insertedKey = false; if (newKeyEntry != NULL) { newKeyEntry->setSlot(slot); if (slot->hasSpace(newKeyEntry)) { - slot->addEntry(newKeyEntry); - inserted = true; + insertedKey = true; } } // Clear the transactions, aborts and commits that were sent previously transactionPartsSent->clear(); pendingSendArbitrationEntriesToDelete->clear(); - - for (ArbitrationRound *round : pendingSendArbitrationRounds) { + uint size = pendingSendArbitrationRounds->size(); + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); bool isFull = false; round->generateParts(); Vector *parts = round->getParts(); // Insert pending arbitration data - for (Entry *arbitrationData : parts) { + uint vsize = parts->size(); + for (uint vi = 0; vi < vsize; vi++) { + Entry *arbitrationData = parts->get(vi); // If it is an abort then we need to set some information - if (arbitrationData instanceof Abort) { + if (arbitrationData->getType() == TypeAbort) { ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); } @@ -1177,22 +1319,14 @@ ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey } if (pendingTransactionQueue->size() > 0) { - Transaction *transaction = pendingTransactionQueue->get(0); - // Set the transaction sequence number if it has yet to be inserted into the block chain - // if ((!transaction->didSendAPartToServer() && !transaction->getServerFailure()) || (transaction->getSequenceNumber() == -1)) { - // transaction->setSequenceNumber(slot->getSequenceNumber()); - // } - if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) { transaction->setSequenceNumber(slot->getSequenceNumber()); } - while (true) { TransactionPart *part = transaction->getNextPartToSend(); - if (part == NULL) { // Ran out of parts to send for this transaction so move on break; @@ -1216,7 +1350,7 @@ ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey // Fill the remainder of the slot with rescue data doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - return new ThreeTuple(false, newSize, inserted); + return false; } void Table::doRejectedMessages(Slot *s) { @@ -1225,14 +1359,14 @@ void Table::doRejectedMessages(Slot *s) { * there is already a sufficient entry in the queue (e->g->, * equalsto value of true and same sequence number)-> */ - int64_t old_seqn = rejectedSlotVector->firstElement(); + int64_t old_seqn = rejectedSlotVector->get(0); if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s->addEntry(rm); } else { int64_t prev_seqn = -1; - int i = 0; + uint i = 0; /* Go through list of missing messages */ for (; i < rejectedSlotVector->size(); i++) { int64_t curr_seqn = rejectedSlotVector->get(i); @@ -1273,7 +1407,7 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Mandatory Rescue for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot previousSlot = buffer->getSlot(currentSequenceNumber); + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); // Push slot number forward if (!seenLiveSlot) { oldestLiveSlotSequenceNumver = currentSequenceNumber; @@ -1290,22 +1424,21 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry *liveEntry : liveEntries) { + uint lESize = liveEntries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); if (slot->hasSpace(liveEntry)) { - // Enough space to rescue the entry slot->addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue - System->out->println("B"); //? - return new ThreeTuple(true, seenLiveSlot, currentSequenceNumber); - + return ThreeTuple(true, seenLiveSlot, currentSequenceNumber); } } } // Did not resize - return new ThreeTuple(false, seenLiveSlot, currentSequenceNumber); + return ThreeTuple(false, seenLiveSlot, currentSequenceNumber); } void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) { @@ -1314,7 +1447,6 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi * for SKIP_THRESHOLD consecutive entries*/ int skipcount = 0; int64_t newestseqnum = buffer->getNewestSeqNum(); -search: for (; seqn <= newestseqnum; seqn++) { Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward @@ -1325,81 +1457,105 @@ search: continue; seenliveslot = true; Vector *liveentries = prevslot->getLiveEntries(resize); - for (Entry *liveentry : liveentries) { + uint lESize = liveentries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); if (s->hasSpace(liveentry)) s->addEntry(liveentry); else { skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) - break search; + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; + goto donesearch; + } } } + delete liveentries; } +donesearch: + ; } /** * Checks for malicious activity and updates the local copy of the block chain-> */ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { - - // The cloud communication layer has checked slot HMACs already before decoding + // The cloud communication layer has checked slot HMACs already + // before decoding if (newSlots->length() == 0) { return; } - // Make sure all slots are newer than the last largest slot this client has seen - int64_t firstSeqNum = newSlots[0]->getSequenceNumber(); + // Make sure all slots are newer than the last largest slot this + // client has seen + int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { throw new Error("Server Error: Sent older slots!"); } - // Create an object that can access both new slots and slots in our local chain - // without committing slots to our local chain + // Create an object that can access both new slots and slots in our + // local chain without committing slots to our local chain SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); // Check that the HMAC chain is not broken checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(lastMessageTable->keySet()); - - // Process each slots data - for (Slot *slot : newSlots) { - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - - updateExpectedSize(); + Hashset *machineSet = new Hashset(); + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; } - // If there is a gap, check to see if the server sent us everything-> + // Process each slots data + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } + } + delete indexer; + + // If there is a gap, check to see if the server sent us + // everything-> if (firstSeqNum != (sequenceNumber + 1)) { // Check the size of the slots that were sent down by the server-> // Can only check the size if there was a gap - checkNumSlots(newSlots->length); + checkNumSlots(newSlots->length()); - // Since there was a gap every machine must have pushed a slot or must have - // a last message message-> If not then the server is hiding slots + // Since there was a gap every machine must have pushed a slot or + // must have a last message message-> If not then the server is + // hiding slots if (!machineSet->isEmpty()) { - throw new Error("Missing record for machines: " + machineSet); + delete machineSet; + throw new Error("Missing record for machines: "); } } - + delete machineSet; // Update the size of our local block chain-> commitNewMaxSize(); // Commit new to slots to the local block chain-> - for (Slot *slot : newSlots) { + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); - // Keep track of how many slots are currently live (have live data in them)-> - liveSlotCount++; + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } } - // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots[newSlots->length() - 1]->getSequenceNumber(); - + sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); updateLiveStateFromServer(); // No Need to remember after we pulled from the server @@ -1440,23 +1596,13 @@ void Table::updateLiveStateFromLocal() { } void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { - // if (didFindTableStatus) { - // return; - // } int64_t prevslots = firstSequenceNumber; - if (didFindTableStatus) { - // expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : expectedsize; - // System->out->println("Here2: " + expectedsize + " " + numberOfSlots + " " + prevslots); - } else { expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - // System->out->println("Here: " + expectedsize); } - // System->out->println(numberOfSlots); - didFindTableStatus = true; currMaxSize = numberOfSlots; } @@ -1471,22 +1617,18 @@ void Table::updateExpectedSize() { /** - * Check the size of the block chain to make sure there are enough slots sent back by the server-> - * This is only called when we have a gap between the slots that we have locally and the slots - * sent by the server therefore in the slots sent by the server there will be at least 1 Table - * status message + * Check the size of the block chain to make sure there are enough + * slots sent back by the server-> This is only called when we have a + * gap between the slots that we have locally and the slots sent by + * the server therefore in the slots sent by the server there will be + * at least 1 Table status message */ void Table::checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots); + throw new Error("Server Error: Server did not send all slots-> Expected: "); } } -void Table::updateCurrMaxSize(int newmaxsize) { - currMaxSize = newmaxsize; -} - - /** * Update the size of of the local buffer if it is needed-> */ @@ -1501,13 +1643,14 @@ void Table::commitNewMaxSize() { // Change the number of local slots to the new size numberOfSlots = (int32_t)currMaxSize; - - // Recalculate the resize threshold since the size of the local buffer has changed + // Recalculate the resize threshold since the size of the local + // buffer has changed setResizeThreshold(); } /** - * Process the new transaction parts from this latest round of slots received from the server + * Process the new transaction parts from this latest round of slots + * received from the server */ void Table::processNewTransactionParts() { @@ -1516,19 +1659,26 @@ void Table::processNewTransactionParts() { return; } - // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newTransactionParts->keySet()) { - Hashtable *, TransactionPart *> *parts = newTransactionParts->get(machineId); + // Iterate through all the machine Ids that we received new parts + // for + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { + int64_t machineId = tpit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); // Iterate through all the parts for that machine Id - for (Pair *partId : parts->keySet()) { + while (ptit->hasNext()) { + Pair *partId = ptit->next(); TransactionPart *part = parts->get(partId); - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= part->getSequenceNumber())) { - // Set dead the transaction part - part->setDead(); - continue; + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + continue; + } } // Get the transaction object for that sequence number @@ -1537,45 +1687,61 @@ void Table::processNewTransactionParts() { if (transaction == NULL) { // This is a new transaction that we dont have so make a new one transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); // Insert this new transaction into the live tables liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(part->getTransactionId(), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); } - - // Add that part to the transaction - transaction->addPartDecode(part); } + delete ptit; + } + delete tpit; + // Clear all the new transaction parts in preparation for the next + // time the server sends slots + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + newTransactionParts->clear(); } - - // Clear all the new transaction parts in preparation for the next time the server sends slots - newTransactionParts->clear(); } void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable->size() == 0) { // Nothing to arbitrate on so move on return; } // Get the transaction sequence numbers and sort from oldest to newest - Vector *transactionSequenceNumbers = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbers); + Vector *transactionSequenceNumbers = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbers->add(trit->next()); + delete trit; + } + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + Hashtable *speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; Hashset *generatedAborts = new Hashset(); - - for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { + uint tsnSize = transactionSequenceNumbers->size(); + for (uint i = 0; i < tsnSize; i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not + // then we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { continue; } @@ -1589,16 +1755,14 @@ void Table::arbitrateFromServer() { continue; } - if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this break; } - // update the largest transaction seen by arbitrator from server - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) == NULL) { + if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); } else { int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); @@ -1609,17 +1773,18 @@ void Table::arbitrateFromServer() { if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { // Guard evaluated as true - // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); speculativeTableTmp->put(kv->getKey(), kv); } + delete kvit; // Update what the last transaction committed was for use in batch commit lastTransactionCommitted = transactionSequenceNumber; } else { // Guard evaluated was false so create abort - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1628,7 +1793,6 @@ void Table::arbitrateFromServer() { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - generatedAborts->add(newAbort); // Insert the abort so we can process @@ -1636,34 +1800,39 @@ void Table::arbitrateFromServer() { } lastSeqNumArbOn = transactionSequenceNumber; - - // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber); } Commit *newCommit = NULL; // If there is something to commit if (speculativeTableTmp->size() != 0) { - // Create the commit and increment the commit sequence number newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); localArbitrationSequenceNumber++; // Add all the new keys to the commit - for (KeyValue *kv : speculativeTableTmp->values()) { + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); newCommit->addKV(kv); } - + delete spit; + // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server - + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } + delete speculativeTableTmp; if ((newCommit != NULL) || (generatedAborts->size() > 0)) { ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); @@ -1672,7 +1841,10 @@ void Table::arbitrateFromServer() { if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1682,54 +1854,64 @@ void Table::arbitrateFromServer() { Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { - // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction + // Check if this machine arbitrates for this transaction if not then + // we cant arbitrate this transaction if (transaction->getArbitrator() != localMachineId) { - return new Pair(false, false); + return Pair(false, false); } if (!transaction->isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this - return new Pair(false, false); + return Pair(false, false); } if (transaction->getMachineId() != localMachineId) { // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) != NULL) { + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { // We've have already seen this from the server - return new Pair(false, false); + return Pair(false, false); } } } if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { - // Guard evaluated as true - - // Create the commit and increment the commit sequence number + // Guard evaluated as true Create the commit and increment the + // commit sequence number Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); localArbitrationSequenceNumber++; // Update the local changes so we can make the commit - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); newCommit->addKV(kv); } + delete kvit; // create the commit parts newCommit->createCommitParts(); - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart *commitPart : newCommit->getParts()->values()) { + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } @@ -1742,21 +1924,18 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { } updateLiveStateFromLocal(); - return new Pair(true, true); + return Pair(true, true); } else { - if (transaction->getMachineId() == localMachineId) { // For locally created messages update the status - // Guard evaluated was false so create abort - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } } else { Hashset *addAbortSet = new Hashset(); - // Create the abort Abort *newAbort = new Abort(NULL, transaction->getClientLocalSequenceNumber(), @@ -1765,67 +1944,71 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { transaction->getArbitrator(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; - addAbortSet->add(newAbort); - - // Append all the commit parts to the end of the pending queue waiting for sending to the server + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); processEntry(commitPart); } } } updateLiveStateFromLocal(); - return new Pair(true, false); + return Pair(true, false); } } /** - * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + * Compacts the arbitration data by merging commits and aggregating + * aborts so that a single large push of commits can be done instead + * of many small updates */ bool Table::compactArbitrationData() { - if (pendingSendArbitrationRounds->size() < 2) { // Nothing to compact so do nothing return false; } ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - if (lastRound->didSendPart()) { + if (lastRound->getDidSendPart()) { return false; } bool hadCommit = (lastRound->getCommit() == NULL); bool gotNewCommit = false; - int numberToDelete = 1; + uint numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); - if (round->isFull() || round->didSendPart()) { - // Stop since there is a part that cannot be compacted and we need to compact in order + if (round->isFull() || round->getDidSendPart()) { + // Stop since there is a part that cannot be compacted and we + // need to compact in order break; } if (round->getCommit() == NULL) { - // Try compacting aborts only int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { + if (newSize > ArbitrationRound_MAX_PARTS) { // Cant compact since it would be too large break; } lastRound->addAborts(round->getAborts()); } else { - // Create a new larger commit - Commit newCommit = Commit->merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); localArbitrationSequenceNumber++; // Create the commit parts so that we can count them @@ -1836,12 +2019,24 @@ bool Table::compactArbitrationData() { newSize += lastRound->getAbortsCount(); newSize += round->getAbortsCount(); - if (newSize > ArbitrationRound->MAX_PARTS) { - // Cant compact since it would be too large + if (newSize > ArbitrationRound_MAX_PARTS) { + // Can't compact since it would be too large + if (lastRound->getCommit() != newCommit && + round->getCommit() != newCommit) + delete newCommit; break; } - // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } lastRound->setCommit(newCommit); lastRound->addAborts(round->getAborts()); gotNewCommit = true; @@ -1852,17 +2047,12 @@ bool Table::compactArbitrationData() { if (numberToDelete != 1) { // If there is a compaction - // Delete the previous pieces that are now in the new compacted piece - if (numberToDelete == pendingSendArbitrationRounds->size()) { - pendingSendArbitrationRounds->clear(); - } else { - for (int i = 0; i < numberToDelete; i++) { - pendingSendArbitrationRounds->remove(pendingSendArbitrationRounds->size() - 1); - } + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - // Add the new compacted into the pending to send list pendingSendArbitrationRounds->add(lastRound); // Should reinsert into the commit processor @@ -1873,26 +2063,27 @@ bool Table::compactArbitrationData() { return false; } -// bool compactArbitrationData() { -// return false; -// } /** - * Update all the commits and the committed tables, sets dead the dead transactions + * Update all the commits and the committed tables, sets dead the dead + * transactions */ bool Table::updateCommittedTable() { - if (newCommitParts->size() == 0) { // Nothing new to process return false; } // Iterate through all the machine Ids that we received new parts for - for (int64_t machineId : newCommitParts->keySet()) { - Hashtable *, CommitPart *> *parts = newCommitParts->get(machineId); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair *partId : parts->keySet()) { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number @@ -1917,62 +2108,74 @@ bool Table::updateCommittedTable() { // Add that part to the commit commit->addPartDecode(part); } + delete pairit; + delete parts; } + delete partsit; - // Clear all the new commits parts in preparation for the next time the server sends slots + // Clear all the new commits parts in preparation for the next time + // the server sends slots newCommitParts->clear(); // If we process a new commit keep track of it for future use bool didProcessANewCommit = false; // Process the commits one by one - for (int64_T arbitratorId : liveCommitsTable->keySet()) { - + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); // Get all the commits for a specific arbitrator Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); - Collections->sort(commitSequenceNumbers); + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); // Get the last commit seen from this arbitrator int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); } // Go through each new commit one by one - for (int i = 0; i < commitSequenceNumbers->size(); i++) { + for (uint i = 0; i < commitSequenceNumbers->size(); i++) { int64_t commitSequenceNumber = commitSequenceNumbers->get(i); Commit *commit = commitForClientTable->get(commitSequenceNumber); - // Special processing if a commit is not complete if (!commit->isComplete()) { if (i == (commitSequenceNumbers->size() - 1)) { - // If there is an incomplete commit and this commit is the latest one seen then this commit cannot be processed and there are no other commits + // If there is an incomplete commit and this commit is the + // latest one seen then this commit cannot be processed and + // there are no other commits break; } else { - // This is a commit that was already dead but parts of it are still in the block chain (not flushed out yet)-> + // This is a commit that was already dead but parts of it + // are still in the block chain (not flushed out yet)-> // Delete it and move on commit->setDead(); commitForClientTable->remove(commit->getSequenceNumber()); + delete commit; continue; } } // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()) != NULL) { - + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -1983,51 +2186,66 @@ bool Table::updateCommittedTable() { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); } - // We have already seen this commit before so need to do the full processing on this commit + // We have already seen this commit before so need to do the + // full processing on this commit if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can if (commit->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < commit->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); } } - continue; } - // If we got here then this is a brand new commit and needs full processing - - // Get what commits should be edited, these are the commits that have live values for their keys + // If we got here then this is a brand new commit and needs full + // processing + // Get what commits should be edited, these are the commits that + // have live values for their keys Hashset *commitsToEdit = new Hashset(); - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); + } + delete kvit; } - commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit *previousCommit : commitsToEdit) { + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { + Commit *previousCommit = commitit->next(); // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { // Update which keys in the old commits are still live - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - previousCommit->invalidateKey(kv->getKey()); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + previousCommit->invalidateKey(kv->getKey()); + } + delete kvit; } // if the commit is now dead then remove it if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit); + commitForClientTable->remove(previousCommit->getSequenceNumber()); + delete previousCommit; } } } + delete commitit; + delete commitsToEdit; // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId()) != NULL) { + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); } @@ -2039,48 +2257,69 @@ bool Table::updateCommittedTable() { didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key - for (KeyValue *kv : commit->getKeyValueUpdateSet()) { - committedKeyValueTable->put(kv->getKey(), kv); - liveCommitsByKeyTable->put(kv->getKey(), commit); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); + } + delete kvit; } } } + delete liveit; return didProcessANewCommit; } /** - * Create the speculative table from transactions that are still live and have come from the cloud + * Create the speculative table from transactions that are still live + * and have come from the cloud */ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->keySet()->size() == 0) { + if (liveTransactionBySequenceNumberTable->size() == 0) { // There is nothing to speculate on return false; } - // Create a list of the transaction sequence numbers and sort them from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); - Collections->sort(transactionSequenceNumbersSorted); + // Create a list of the transaction sequence numbers and sort them + // from oldest to newest + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { - // If there is a gap in the transaction sequence numbers then there was a commit or an abort of a transaction - // OR there was a new commit (Could be from offline commit) so a redo the speculation from scratch + // If there is a gap in the transaction sequence numbers then + // there was a commit or an abort of a transaction OR there was a + // new commit (Could be from offline commit) so a redo the + // speculation from scratch // Start from scratch speculatedKeyValueTable->clear(); lastTransactionSequenceNumberSpeculatedOn = -1; oldestTransactionSequenceNumberSpeculatedOn = -1; - } // Remember the front of the transaction list oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); // Find where to start arbitration from - int startIndex = transactionSequenceNumbersSorted->indexOf(lastTransactionSequenceNumberSpeculatedOn) + 1; + uint startIndex = 0; + + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; if (startIndex >= transactionSequenceNumbersSorted->size()) { // Make sure we are not out of bounds @@ -2090,13 +2329,14 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { Hashset *incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; - for (int i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { + for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); if (!transaction->isComplete()) { - // If there is an incomplete transaction then there is nothing we can do - // add this transactions arbitrator to the list of arbitrators we should ignore + // If there is an incomplete transaction then there is nothing + // we can do add this transactions arbitrator to the list of + // arbitrators we should ignore incompleteTransactionArbitrator->add(transaction->getArbitrator()); didSkip = true; continue; @@ -2110,8 +2350,13 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { - speculatedKeyValueTable->put(kv->getKey(), kv); + { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; } } } @@ -2127,7 +2372,8 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } /** - * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer + * Create the pending transaction speculative table from transactions + * that are still in the pending transaction buffer */ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { if (pendingTransactionQueue->size() == 0) { @@ -2143,63 +2389,79 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr } // Find where to start arbitration from - int startIndex = pendingTransactionQueue->indexOf(firstPendingTransaction) + 1; + uint startIndex = 0; + + for (; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; if (startIndex >= pendingTransactionQueue->size()) { // Make sure we are not out of bounds return; } - for (int i = startIndex; i < pendingTransactionQueue->size(); i++) { + for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) { Transaction *transaction = pendingTransactionQueue->get(i); lastPendingTransactionSpeculatedOn = transaction; if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table - for (KeyValue *kv : transaction->getKeyValueUpdateSet()) { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); } + delete kvit; } } } /** - * Set dead and remove from the live transaction tables the transactions that are dead + * Set dead and remove from the live transaction tables the + * transactions that are dead */ void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { - Transaction *transaction = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= transaction->getSequenceNumber())) { - - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); + { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) + && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + delete transaction; + } } + delete iter; } // Go through each of the transactions - for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus *status = iter->next()->getValue(); - - // Check if the transaction is dead - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= status->getTransactionSequenceNumber())) { - - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); + { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) + && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); - // Remove - iter->remove(); + // Remove + iter->remove(); + } } + delete iter; } } @@ -2212,39 +2474,34 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); // Process each entry in the slot - for (Entry *entry : slot->getEntries()) { + Vector *entries = slot->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); switch (entry->getType()) { - case TypeCommitPart: processEntry((CommitPart *)entry); break; - case TypeAbort: processEntry((Abort *)entry); break; - case TypeTransactionPart: processEntry((TransactionPart *)entry); break; - case TypeNewKey: processEntry((NewKey *)entry); break; - case TypeLastMessage: processEntry((LastMessage *)entry, machineSet); break; - case TypeRejectedMessage: processEntry((RejectedMessage *)entry, indexer); break; - case TypeTableStatus: processEntry((TableStatus *)entry, slot->getSequenceNumber()); break; - default: - throw new Error("Unrecognized type: " + entry->getType()); + throw new Error("Unrecognized type: "); } } } @@ -2258,10 +2515,10 @@ void Table::processEntry(LastMessage *entry, Hashset *machineSet) { } /** - * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) + * Add the new key to the arbitrators table and update the set of live + * new keys (in case of a rescued new key message) */ void Table::processEntry(NewKey *entry) { - // Update the arbitrator table with the new key information arbitratorTable->put(entry->getKey(), entry->getMachineID()); @@ -2274,18 +2531,19 @@ void Table::processEntry(NewKey *entry) { } /** - * Process new table status entries and set dead the old ones as new ones come in-> - * keeps track of the largest and smallest table status seen in this current round - * of updating the local copy of the block chain + * Process new table status entries and set dead the old ones as new + * ones come in-> keeps track of the largest and smallest table status + * seen in this current round of updating the local copy of the block + * chain */ -void Table::processEntry(TableStatus entry, int64_t seq) { +void Table::processEntry(TableStatus *entry, int64_t seq) { int newNumSlots = entry->getMaxSlots(); updateCurrMaxSize(newNumSlots); - initExpectedSize(seq, newNumSlots); if (liveTableStatus != NULL) { - // We have a larger table status so the old table status is no int64_ter alive + // We have a larger table status so the old table status is no + // int64_ter alive liveTableStatus->setDead(); } @@ -2294,7 +2552,8 @@ void Table::processEntry(TableStatus entry, int64_t seq) { } /** - * Check old messages to see if there is a block chain violation-> Also + * Check old messages to see if there is a block chain violation-> + * Also */ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t oldSeqNum = entry->getOldSeqNum(); @@ -2303,30 +2562,29 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t machineId = entry->getMachineID(); int64_t seq = entry->getSequenceNumber(); - - // Check if we have messages that were supposed to be rejected in our local block chain + // Check if we have messages that were supposed to be rejected in + // our local block chain for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - // Get the slot Slot *slot = indexer->getSlot(seqNum); if (slot != NULL) { - // If we have this slot make sure that it was not supposed to be a rejected slot - + // If we have this slot make sure that it was not supposed to be + // a rejected slot int64_t slotMachineId = slot->getMachineID(); if (isequal != (slotMachineId == machineId)) { - throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); + throw new Error("Server Error: Trying to insert rejected message for slot "); } } } - - // Create a list of clients to watch until they see this rejected message entry-> + // Create a list of clients to watch until they see this rejected + // message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry *> *lastMessageEntry : lastMessageTable->entrySet()) { - + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); + int64_t lastMessageEntryMachineId = iter->next(); // We've seen it, don't need to continue to watch-> Our next // message will implicitly acknowledge it-> @@ -2334,22 +2592,24 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { continue; } - Pair *lastMessageValue = lastMessageEntry->getValue(); + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { - - // Add this rejected message to the set of messages that this machine ID did not see yet + // Add this rejected message to the set of messages that this + // machine ID did not see yet addWatchVector(lastMessageEntryMachineId, entry); - - // This client did not see this rejected message yet so add it to the watch set to monitor + // This client did not see this rejected message yet so add it + // to the watch set to monitor deviceWatchSet->add(lastMessageEntryMachineId); } } + delete iter; if (deviceWatchSet->isEmpty()) { // This rejected message has been seen by all the clients so entry->setDead(); + delete deviceWatchSet; } else { // We need to watch this rejected message entry->setWatchSet(deviceWatchSet); @@ -2357,12 +2617,10 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } /** - * Check if this abort is live, if not then save it so we can kill it later-> - * update the last transaction number that was arbitrated on-> + * Check if this abort is live, if not then save it so we can kill it + * later-> update the last transaction number that was arbitrated on-> */ void Table::processEntry(Abort *entry) { - - if (entry->getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); @@ -2371,10 +2629,12 @@ void Table::processEntry(Abort *entry) { } } - // Abort has not been seen by the client it is for yet so we need to keep track of it - Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + // Abort has not been seen by the client it is for yet so we need to + // keep track of it + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); if (previouslySeenAbort != NULL) { - previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version + previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } if (entry->getTransactionArbitrator() == localMachineId) { @@ -2382,21 +2642,19 @@ void Table::processEntry(Abort *entry) { } if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { - // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(entry->getAbortId()); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); } - return; } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { - + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -2407,17 +2665,18 @@ void Table::processEntry(Abort *entry) { lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); } - // Set dead a transaction if we can - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(new Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber())); + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); if (transactionToSetDead != NULL) { liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); } - // Update the last transaction sequence number that the arbitrator arbitrated on - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()); - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { - + // Update the last transaction sequence number that the arbitrator + // arbitrated on + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { // Is a valid one if (entry->getTransactionSequenceNumber() != -1) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); @@ -2426,28 +2685,30 @@ void Table::processEntry(Abort *entry) { } /** - * Set dead the transaction part if that transaction is dead and keep track of all new parts + * Set dead the transaction part if that transaction is dead and keep + * track of all new parts */ void Table::processEntry(TransactionPart *entry) { - // Check if we have already seen this transaction and set it dead OR if it is not alive - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); - if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { + // Check if we have already seen this transaction and set it dead OR + // if it is not alive + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { // This transaction is dead, it was already committed or aborted entry->setDead(); return; } // This part is still alive - Hashtable *, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId()); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable *, TransactionPart *>(); + transactionPart = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newTransactionParts->put(entry->getMachineId(), transactionPart); } - // Update the part and set dead ones we have already seen (got a rescued version) - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + // Update the part and set dead ones we have already seen (got a + // rescued version) + TransactionPart *previouslySeenPart = transactionPart->put(new Pair(entry->getPartId()), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2459,26 +2720,20 @@ void Table::processEntry(TransactionPart *entry) { void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); - - // Update the last transaction sequence number that the arbitrator arbitrated on - if ((lastTransactionNumber == NULL) || (lastTransactionNumber < entry->getTransactionSequenceNumber())) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); } } - - - - Hashtable *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); - + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable *, CommitPart *>(); + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); newCommitParts->put(entry->getMachineId(), commitPart); } - - // Update the part and set dead ones we have already seen (got a rescued version) + // Update the part and set dead ones we have already seen (got a + // rescued version) CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); @@ -2486,66 +2741,64 @@ void Table::processEntry(CommitPart *entry) { } /** - * Update the last message seen table-> Update and set dead the appropriate RejectedMessages as clients see them-> - * Updates the live aborts, removes those that are dead and sets them dead-> - * Check that the last message seen is correct and that there is no mismatch of our own last message or that - * other clients have not had a rollback on the last message-> + * Update the last message seen table-> Update and set dead the + * appropriate RejectedMessages as clients see them-> Updates the live + * aborts, removes those that are dead and sets them dead-> Check that + * the last message seen is correct and that there is no mismatch of + * our own last message or that other clients have not had a rollback + * on the last message-> */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { - +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet->remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); - // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { + // Go through each rejected message that this machine Id has not + // seen yet - // Go through each rejected message that this machine Id has not seen yet - for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { - + SetIterator *rmit = watchset->iterator(); + while (rmit->hasNext()) { RejectedMessage *rm = rmit->next(); - // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { - // Remove it from our watchlist rmit->remove(); - // Decrement machines that need to see this notification rm->removeWatcher(machineId); } } + delete rmit; } // Set dead the abort - for (IteratorEntry *, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort *abort = i->next()->getValue(); + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); + while (abortit->hasNext()) { + Pair *key = abortit->next(); + Abort *abort = liveAbortTable->get(key); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); - i->remove(); - + abortit->remove(); if (abort->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); } } } - - - + delete abortit; if (machineId == localMachineId) { // Our own messages are immediately dead-> - if (liveness instanceof LastMessage) { + char livenessType = liveness->getType(); + if (livenessType == TypeLastMessage) { ((LastMessage *)liveness)->setDead(); - } else if (liveness instanceof Slot) { + } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } - // Get the old last message for this device Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { @@ -2555,31 +2808,31 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); Liveness *lastEntry = lastMessageEntry->getSecond(); + delete lastMessageEntry; // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { - if (lastEntry instanceof LastMessage) { + char lastEntryType = lastEntry->getType(); + + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); - } else if (lastEntry instanceof Slot) { + } else if (lastEntryType == TypeSlot) { ((Slot *)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); } } - // Make sure the server is not playing any games if (machineId == localMachineId) { - if (hadPartialSendToServer) { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: "); } - } else { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed: "); } } } else { @@ -2590,8 +2843,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } /** - * Add a rejected message entry to the watch set to keep track of which clients have seen that - * rejected message entry and which have not-> + * Add a rejected message entry to the watch set to keep track of + * which clients have seen that rejected message entry and which have + * not. */ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); @@ -2607,12 +2861,11 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { * Check if the HMAC chain is not violated */ void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (int i = 0; i < newSlots->length; i++) { - Slot *currSlot = newSlots[i]; + for (uint i = 0; i < newSlots->length(); i++) { + Slot *currSlot = newSlots->get(i); Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL && !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) throw new Error("Server Error: Invalid HMAC Chain"); } } -